1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use anyhow::{bail, ensure, Context as _};
8use bytes::BytesMut;
9use futures::future::try_join_all;
10use tokio::io::AsyncWriteExt as _;
11use tokio::time::Instant;
12use tokio::try_join;
13use tokio_util::codec::Encoder;
14use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
15use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
16use wasmtime::{AsContextMut, Engine, StoreContextMut};
17use wasmtime_wasi::WasiView;
18use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
19
20use crate::rpc::Error;
21use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
22
23#[instrument(level = "trace", skip_all)]
25pub fn link_item<V>(
26 engine: &Engine,
27 linker: &mut LinkerInstance<V>,
28 guest_resources: impl Into<Arc<[ResourceType]>>,
29 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
30 ty: types::ComponentItem,
31 instance: impl Into<Arc<str>>,
32 name: impl Into<Arc<str>>,
33) -> wasmtime::Result<()>
34where
35 V: WasiView + WrpcView,
36{
37 let instance = instance.into();
38 let guest_resources = guest_resources.into();
39 let host_resources = host_resources.into();
40 match ty {
41 types::ComponentItem::ComponentFunc(ty) => {
42 let name = name.into();
43 debug!(?instance, ?name, "linking function");
44 link_function(
45 linker,
46 Arc::clone(&guest_resources),
47 Arc::clone(&host_resources),
48 ty,
49 instance,
50 name,
51 )?;
52 }
53 types::ComponentItem::CoreFunc(_) => {
54 bail!("polyfilling core functions not supported yet")
55 }
56 types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
57 types::ComponentItem::Component(ty) => {
58 for (name, ty) in ty.imports(engine) {
59 debug!(?instance, name, "linking component item");
60 link_item(
61 engine,
62 linker,
63 Arc::clone(&guest_resources),
64 Arc::clone(&host_resources),
65 ty,
66 "",
67 name,
68 )?;
69 }
70 }
71 types::ComponentItem::ComponentInstance(ty) => {
72 let name = name.into();
73 let mut linker = linker
74 .instance(&name)
75 .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
76 debug!(?instance, ?name, "linking instance");
77 link_instance(
78 engine,
79 &mut linker,
80 guest_resources,
81 host_resources,
82 ty,
83 name,
84 )?;
85 }
86 types::ComponentItem::Type(_) => {}
87 types::ComponentItem::Resource(ty) => {
88 let name = name.into();
89 let Some((guest_ty, host_ty)) = host_resources
90 .get(&*instance)
91 .and_then(|instance| instance.get(&*name))
92 else {
93 bail!("resource type for {instance}/{name} not defined");
94 };
95 ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
96
97 debug!(?instance, ?name, "linking resource");
98 linker.resource(&name, *host_ty, |_, _| Ok(()))?;
99 }
100 }
101 Ok(())
102}
103
104#[instrument(level = "trace", skip_all)]
106pub fn link_instance<V>(
107 engine: &Engine,
108 linker: &mut LinkerInstance<V>,
109 guest_resources: impl Into<Arc<[ResourceType]>>,
110 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
111 ty: types::ComponentInstance,
112 name: impl Into<Arc<str>>,
113) -> wasmtime::Result<()>
114where
115 V: WrpcView + WasiView,
116{
117 let instance = name.into();
118 let guest_resources = guest_resources.into();
119 let host_resources = host_resources.into();
120 for (name, ty) in ty.exports(engine) {
121 debug!(name, "linking instance item");
122 link_item(
123 engine,
124 linker,
125 Arc::clone(&guest_resources),
126 Arc::clone(&host_resources),
127 ty,
128 Arc::clone(&instance),
129 name,
130 )?;
131 }
132 Ok(())
133}
134
135#[allow(clippy::too_many_arguments)]
136async fn invoke<T: WrpcView + WasiView>(
137 mut store: &mut StoreContextMut<'_, T>,
138 params: &[Val],
139 results: &mut [Val],
140 guest_resources: Arc<[ResourceType]>,
141 params_ty: impl IntoIterator<Item = (&str, Type)>,
142 results_ty: impl IntoIterator<Item = Type>,
143 instance: Arc<str>,
144 name: Arc<str>,
145) -> wasmtime::Result<anyhow::Result<()>> {
146 let mut buf = BytesMut::default();
147 let mut deferred = vec![];
148 for (v, (name, ref ty)) in zip(params, params_ty) {
149 let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources);
150 enc.encode(v, &mut buf)
151 .with_context(|| format!("failed to encode parameter `{name}`"))?;
152 deferred.push(enc.deferred);
153 }
154 let clt = store.data().client();
155 let cx = store.data().context();
156 let timeout = store.data().timeout();
157 let buf = buf.freeze();
158 let paths = &[[]; 0];
160 let rpc_name = rpc_func_name(&name);
161 let start = Instant::now();
162 let invocation = if let Some(timeout) = timeout {
163 clt.timeout(timeout)
164 .invoke(cx, &instance, rpc_name, buf, paths)
165 .await
166 } else {
167 clt.invoke(cx, &instance, rpc_name, buf, paths).await
168 }
169 .with_context(|| format!("failed to invoke `{instance}.{name}` polyfill via wRPC"));
170 let (outgoing, incoming) = match invocation {
171 Ok((outgoing, incoming)) => (outgoing, incoming),
172 Err(err) => return Ok(Err(err)),
173 };
174 let tx = async {
175 try_join_all(
176 zip(0.., deferred)
177 .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
178 .map(|(w, f)| async move {
179 let w = w?;
180 f(w).await
181 }),
182 )
183 .await
184 .context("failed to write asynchronous parameters")?;
185 let mut outgoing = pin!(outgoing);
186 outgoing
187 .flush()
188 .await
189 .context("failed to flush outgoing stream")?;
190 if let Err(err) = outgoing.shutdown().await {
191 trace!(?err, "failed to shutdown outgoing stream");
192 }
193 anyhow::Ok(())
194 };
195 let rx = async {
196 let mut incoming = pin!(incoming);
197 for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
198 read_value(&mut store, &mut incoming, &guest_resources, v, ty, &[i])
199 .await
200 .with_context(|| format!("failed to decode return value {i}"))?;
201 }
202 Ok(())
203 };
204 let res = if let Some(timeout) = timeout {
205 let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
206 try_join!(
207 async {
208 tokio::time::timeout(timeout, tx)
209 .await
210 .context("data transmission timed out")?
211 },
212 async {
213 tokio::time::timeout(timeout, rx)
214 .await
215 .context("data receipt timed out")?
216 },
217 )
218 } else {
219 try_join!(tx, rx)
220 };
221 match res {
222 Ok(((), ())) => Ok(Ok(())),
223 Err(err) => Ok(Err(err)),
224 }
225}
226
227#[instrument(level = "trace", skip_all)]
229pub fn link_function<V>(
230 linker: &mut LinkerInstance<V>,
231 guest_resources: impl Into<Arc<[ResourceType]>>,
232 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
233 ty: types::ComponentFunc,
234 instance: impl Into<Arc<str>>,
235 name: impl Into<Arc<str>>,
236) -> wasmtime::Result<()>
237where
238 V: WrpcView + WasiView,
239{
240 let span = Span::current();
241 let instance = instance.into();
242 let name = name.into();
243 let guest_resources = guest_resources.into();
244 let host_resources = host_resources.into();
245 match rpc_result_type(&host_resources, ty.results()) {
246 None => linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
247 let ty = ty.clone();
248 let instance = Arc::clone(&instance);
249 let name = Arc::clone(&name);
250 let resources = Arc::clone(&guest_resources);
251 Box::new(
252 async move {
253 match invoke(
254 &mut store,
255 params,
256 results,
257 resources,
258 ty.params(),
259 ty.results(),
260 instance,
261 name,
262 )
263 .await
264 {
265 Ok(Ok(())) => Ok(()),
266 Ok(Err(err)) => Err(err),
267 Err(err) => Err(err),
268 }
269 }
270 .instrument(span.clone()),
271 )
272 }),
273 Some(None) => {
275 linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
276 let ty = ty.clone();
277 let instance = Arc::clone(&instance);
278 let name = Arc::clone(&name);
279 let resources = Arc::clone(&guest_resources);
280 Box::new(
281 async move {
282 let [result] = results else {
283 bail!("result type mismatch");
284 };
285 match invoke(
286 &mut store,
287 params,
288 &mut [],
289 resources,
290 ty.params(),
291 None,
292 instance,
293 name,
294 )
295 .await?
296 {
297 Ok(()) => {
298 *result = Val::Result(Ok(None));
299 }
300 Err(err) => {
301 let err = store.data_mut().push_error(Error::Invoke(err))?;
302 let err = err
303 .try_into_resource_any(&mut store)
304 .context("failed to lower error resource")?;
305 *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
306 }
307 }
308 Ok(())
309 }
310 .instrument(span.clone()),
311 )
312 })
313 }
314 Some(Some(result_ty)) => {
316 linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
317 let ty = ty.clone();
318 let instance = Arc::clone(&instance);
319 let name = Arc::clone(&name);
320 let resources = Arc::clone(&guest_resources);
321 let result_ty = result_ty.clone();
322 Box::new(
323 async move {
324 let [result] = results else {
325 bail!("result type mismatch");
326 };
327 let mut ok = [Val::Bool(false); 1];
328 match invoke(
329 &mut store,
330 params,
331 ok.as_mut_slice(),
332 resources,
333 ty.params(),
334 [result_ty],
335 instance,
336 name,
337 )
338 .await?
339 {
340 Ok(()) => {
341 let [ok] = ok;
342 *result = Val::Result(Ok(Some(Box::new(ok))));
343 }
344 Err(err) => {
345 let err = store.data_mut().push_error(Error::Invoke(err))?;
346 let err = err
347 .try_into_resource_any(&mut store)
348 .context("failed to lower error resource")?;
349 *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
350 }
351 }
352 Ok(())
353 }
354 .instrument(span.clone()),
355 )
356 })
357 }
358 }
359}