1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use bytes::BytesMut;
8use futures::future::try_join_all;
9use tokio::io::AsyncWriteExt as _;
10use tokio::time::Instant;
11use tokio::try_join;
12use tokio_util::codec::Encoder;
13use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
14use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
15use wasmtime::error::Context as _;
16use wasmtime::{bail, ensure, AsContextMut, Engine, StoreContextMut};
17use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
18
19use crate::rpc::Error;
20use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
21
22#[instrument(level = "trace", skip_all)]
24pub fn link_item<V>(
25 engine: &Engine,
26 linker: &mut LinkerInstance<V>,
27 guest_resources: impl Into<Arc<[ResourceType]>>,
28 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
29 ty: types::ComponentItem,
30 instance: impl Into<Arc<str>>,
31 name: impl Into<Arc<str>>,
32) -> wasmtime::Result<()>
33where
34 V: WrpcView,
35{
36 let instance = instance.into();
37 let guest_resources = guest_resources.into();
38 let host_resources = host_resources.into();
39 match ty {
40 types::ComponentItem::ComponentFunc(ty) => {
41 let name = name.into();
42 debug!(?instance, ?name, "linking function");
43 link_function(
44 linker,
45 Arc::clone(&guest_resources),
46 Arc::clone(&host_resources),
47 ty,
48 instance,
49 name,
50 )?;
51 }
52 types::ComponentItem::CoreFunc(_) => {
53 bail!("polyfilling core functions not supported yet")
54 }
55 types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
56 types::ComponentItem::Component(ty) => {
57 for (name, ty) in ty.imports(engine) {
58 debug!(?instance, name, "linking component item");
59 link_item(
60 engine,
61 linker,
62 Arc::clone(&guest_resources),
63 Arc::clone(&host_resources),
64 ty,
65 "",
66 name,
67 )?;
68 }
69 }
70 types::ComponentItem::ComponentInstance(ty) => {
71 let name = name.into();
72 let mut linker = linker
73 .instance(&name)
74 .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
75 debug!(?instance, ?name, "linking instance");
76 link_instance(
77 engine,
78 &mut linker,
79 guest_resources,
80 host_resources,
81 ty,
82 name,
83 )?;
84 }
85 types::ComponentItem::Type(_) => {}
86 types::ComponentItem::Resource(ty) => {
87 let name = name.into();
88 let Some((guest_ty, host_ty)) = host_resources
89 .get(&*instance)
90 .and_then(|instance| instance.get(&*name))
91 else {
92 bail!("resource type for {instance}/{name} not defined");
93 };
94 ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
95
96 debug!(?instance, ?name, "linking resource");
97 linker.resource(&name, *host_ty, |_, _| Ok(()))?;
98 }
99 }
100 Ok(())
101}
102
103#[instrument(level = "trace", skip_all)]
105pub fn link_instance<V>(
106 engine: &Engine,
107 linker: &mut LinkerInstance<V>,
108 guest_resources: impl Into<Arc<[ResourceType]>>,
109 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
110 ty: types::ComponentInstance,
111 name: impl Into<Arc<str>>,
112) -> wasmtime::Result<()>
113where
114 V: WrpcView,
115{
116 let instance = name.into();
117 let guest_resources = guest_resources.into();
118 let host_resources = host_resources.into();
119 for (name, ty) in ty.exports(engine) {
120 debug!(name, "linking instance item");
121 link_item(
122 engine,
123 linker,
124 Arc::clone(&guest_resources),
125 Arc::clone(&host_resources),
126 ty,
127 Arc::clone(&instance),
128 name,
129 )?;
130 }
131 Ok(())
132}
133
134#[allow(clippy::too_many_arguments)]
135async fn invoke<T: WrpcView>(
136 mut store: &mut StoreContextMut<'_, T>,
137 params: &[Val],
138 results: &mut [Val],
139 guest_resources: Arc<[ResourceType]>,
140 params_ty: impl IntoIterator<Item = (&str, Type)>,
141 results_ty: impl IntoIterator<Item = Type>,
142 instance: Arc<str>,
143 name: Arc<str>,
144) -> wasmtime::Result<anyhow::Result<()>> {
145 let mut buf = BytesMut::default();
146 let mut deferred = vec![];
147 for (v, (name, ref ty)) in zip(params, params_ty) {
148 let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources, &[]);
149 enc.encode(v, &mut buf)
150 .with_context(|| format!("failed to encode parameter `{name}`"))?;
151 deferred.push(enc.deferred);
152 }
153 let view = store.data_mut().wrpc();
154 let clt = view.ctx.client();
155 let cx = view.ctx.context();
156 let timeout = view.ctx.timeout();
157 let buf = buf.freeze();
158 let paths = &[[]; 0];
160 let rpc_name = rpc_func_name(&name);
161 let start = Instant::now();
162 let invocation = if let Some(timeout) = timeout {
163 clt.timeout(timeout)
164 .invoke(cx, &instance, rpc_name, buf, paths)
165 .await
166 } else {
167 clt.invoke(cx, &instance, rpc_name, buf, paths).await
168 };
169 let (outgoing, incoming) = match invocation {
170 Ok((outgoing, incoming)) => (outgoing, incoming),
171 Err(err) => {
172 return Ok(Err(err.context(format!(
173 "failed to invoke `{instance}.{name}` polyfill via wRPC"
174 ))))
175 }
176 };
177 let tx = async {
178 try_join_all(
179 zip(0.., deferred)
180 .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
181 .map(|(w, f)| async move {
182 let w = w.map_err(wasmtime::Error::from_anyhow)?;
183 f(w).await
184 }),
185 )
186 .await
187 .context("failed to write asynchronous parameters")?;
188 let mut outgoing = pin!(outgoing);
189 outgoing
190 .flush()
191 .await
192 .context("failed to flush outgoing stream")?;
193 if let Err(err) = outgoing.shutdown().await {
194 trace!(?err, "failed to shutdown outgoing stream");
195 }
196 wasmtime::error::Ok(())
197 };
198 let rx = async {
199 let mut incoming = pin!(incoming);
200 for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
201 read_value(
202 &mut store,
203 &mut incoming,
204 &guest_resources,
205 &[],
206 v,
207 ty,
208 &[i],
209 )
210 .await
211 .with_context(|| format!("failed to decode return value {i}"))?;
212 }
213 wasmtime::error::Ok(())
214 };
215 let res = if let Some(timeout) = timeout {
216 let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
217 try_join!(
218 async {
219 tokio::time::timeout(timeout, tx)
220 .await
221 .context("data transmission timed out")?
222 },
223 async {
224 tokio::time::timeout(timeout, rx)
225 .await
226 .context("data receipt timed out")?
227 },
228 )
229 } else {
230 try_join!(tx, rx)
231 };
232 match res {
233 Ok(((), ())) => Ok(Ok(())),
234 Err(err) => Ok(Err(err.into())),
235 }
236}
237
238#[instrument(level = "trace", skip_all)]
240pub fn link_function<V>(
241 linker: &mut LinkerInstance<V>,
242 guest_resources: impl Into<Arc<[ResourceType]>>,
243 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
244 ty: types::ComponentFunc,
245 instance: impl Into<Arc<str>>,
246 name: impl Into<Arc<str>>,
247) -> wasmtime::Result<()>
248where
249 V: WrpcView,
250{
251 let span = Span::current();
252 let instance = instance.into();
253 let name = name.into();
254 let guest_resources = guest_resources.into();
255 let host_resources = host_resources.into();
256 match rpc_result_type(&host_resources, ty.results()) {
257 None => linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
258 let instance = Arc::clone(&instance);
259 let name = Arc::clone(&name);
260 let resources = Arc::clone(&guest_resources);
261 Box::new(
262 async move {
263 match invoke(
264 &mut store,
265 params,
266 results,
267 resources,
268 ty.params(),
269 ty.results(),
270 instance,
271 name,
272 )
273 .await
274 {
275 Ok(Ok(())) => Ok(()),
276 Ok(Err(err)) => Err(wasmtime::Error::from_anyhow(err)),
277 Err(err) => Err(err),
278 }
279 }
280 .instrument(span.clone()),
281 )
282 }),
283 Some(None) => {
285 linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
286 let instance = Arc::clone(&instance);
287 let name = Arc::clone(&name);
288 let resources = Arc::clone(&guest_resources);
289 Box::new(
290 async move {
291 let [result] = results else {
292 bail!("result type mismatch");
293 };
294 match invoke(
295 &mut store,
296 params,
297 &mut [],
298 resources,
299 ty.params(),
300 None,
301 instance,
302 name,
303 )
304 .await?
305 {
306 Ok(()) => {
307 *result = Val::Result(Ok(None));
308 }
309 Err(err) => {
310 let err = store.data_mut().push_error(Error::Invoke(err))?;
311 let err = err
312 .try_into_resource_any(&mut store)
313 .context("failed to lower error resource")?;
314 *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
315 }
316 }
317 Ok(())
318 }
319 .instrument(span.clone()),
320 )
321 })
322 }
323 Some(Some(result_ty)) => {
325 linker.func_new_async(&Arc::clone(&name), move |mut store, ty, params, results| {
326 let instance = Arc::clone(&instance);
327 let name = Arc::clone(&name);
328 let resources = Arc::clone(&guest_resources);
329 let result_ty = result_ty.clone();
330 Box::new(
331 async move {
332 let [result] = results else {
333 bail!("result type mismatch");
334 };
335 let mut ok = [Val::Bool(false); 1];
336 match invoke(
337 &mut store,
338 params,
339 ok.as_mut_slice(),
340 resources,
341 ty.params(),
342 [result_ty],
343 instance,
344 name,
345 )
346 .await?
347 {
348 Ok(()) => {
349 let [ok] = ok;
350 *result = Val::Result(Ok(Some(Box::new(ok))));
351 }
352 Err(err) => {
353 let err = store.data_mut().push_error(Error::Invoke(err))?;
354 let err = err
355 .try_into_resource_any(&mut store)
356 .context("failed to lower error resource")?;
357 *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
358 }
359 }
360 Ok(())
361 }
362 .instrument(span.clone()),
363 )
364 })
365 }
366 }
367}