Skip to main content

wrpc_runtime_wasmtime/
serve.rs

1use core::future::Future;
2use core::pin::Pin;
3
4use std::{collections::HashMap, sync::Arc};
5
6use anyhow::Context as _;
7use futures::{Stream, TryStreamExt as _};
8use tokio::sync::Mutex;
9use tracing::{debug, instrument, Instrument as _, Span};
10use wasmtime::component::types;
11use wasmtime::component::{Instance, InstancePre, ResourceType};
12use wasmtime::AsContextMut;
13use wasmtime_wasi::WasiView;
14
15use crate::{call, rpc_func_name, WrpcView};
16
17pub trait ServeExt: wrpc_transport::Serve {
18    /// Serve [`types::ComponentFunc`] from an [`InstancePre`] instantiating it on each call.
19    /// This serving method does not support guest-exported resources.
20    #[instrument(level = "trace", skip(self, store, instance_pre, host_resources))]
21    fn serve_function<T>(
22        &self,
23        store: impl Fn() -> wasmtime::Store<T> + Send + 'static,
24        instance_pre: InstancePre<T>,
25        host_resources: impl Into<
26            Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
27        >,
28        ty: types::ComponentFunc,
29        instance_name: &str,
30        name: &str,
31    ) -> impl Future<
32        Output = anyhow::Result<
33            impl Stream<
34                    Item = anyhow::Result<(
35                        Self::Context,
36                        Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
37                    )>,
38                > + Send
39                + 'static,
40        >,
41    > + Send
42    where
43        T: WasiView + WrpcView + 'static,
44    {
45        let span = Span::current();
46        let host_resources = host_resources.into();
47        async move {
48            debug!(instance = instance_name, name, "serving function export");
49            let component_ty = instance_pre.component();
50            let idx = if instance_name.is_empty() {
51                None
52            } else {
53                let idx = component_ty
54                    .get_export_index(None, instance_name)
55                    .with_context(|| format!("export `{instance_name}` not found"))?;
56                Some(idx)
57            };
58            let idx = component_ty
59                .get_export_index(idx.as_ref(), name)
60                .with_context(|| format!("export `{name}` not found"))?;
61
62            let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
63            let io_streams: Arc<[ResourceType]> = crate::paths::wasi_io_stream_resources(
64                component_ty.engine(),
65                &component_ty.component_type(),
66            )
67            .into();
68            let paths = crate::paths::params_async_paths(params_ty.iter(), &io_streams);
69            let invocations = self
70                .serve(instance_name, rpc_func_name(name), paths)
71                .await?;
72            let name = Arc::<str>::from(name);
73            let results_ty: Arc<[_]> = ty.results().collect();
74            let host_resources = Arc::clone(&host_resources);
75            Ok(invocations.map_ok(move |(cx, tx, rx)| {
76                let instance_pre = instance_pre.clone();
77                let name = Arc::clone(&name);
78                let params_ty = Arc::clone(&params_ty);
79                let results_ty = Arc::clone(&results_ty);
80                let host_resources = Arc::clone(&host_resources);
81                let io_streams = Arc::clone(&io_streams);
82
83                let mut store = store();
84                (
85                    cx,
86                    Box::pin(
87                        async move {
88                            let instance = instance_pre
89                                .instantiate_async(&mut store)
90                                .await
91                                .map_err(anyhow::Error::from)
92                                .context("failed to instantiate component")?;
93                            let func = instance
94                                .get_func(&mut store, idx)
95                                .with_context(|| format!("function export `{name}` not found"))?;
96                            call(
97                                &mut store,
98                                rx,
99                                tx,
100                                &[],
101                                &host_resources,
102                                &io_streams,
103                                params_ty.iter(),
104                                &results_ty,
105                                func,
106                            )
107                            .await?;
108                            Ok(())
109                        }
110                        .instrument(span.clone()),
111                    ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
112                )
113            }))
114        }
115    }
116
117    /// Like [`Self::serve_function`], but with a shared `store` instance.
118    /// This is required to allow for serving functions, which operate on guest-exported resources.
119    #[instrument(
120        level = "trace",
121        skip(self, store, instance, guest_resources, host_resources)
122    )]
123    #[allow(clippy::too_many_arguments)]
124    fn serve_function_shared<T>(
125        &self,
126        store: Arc<Mutex<wasmtime::Store<T>>>,
127        instance: Instance,
128        guest_resources: impl Into<Arc<[ResourceType]>>,
129        host_resources: impl Into<
130            Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
131        >,
132        io_stream_resources: impl Into<Arc<[ResourceType]>>,
133        ty: types::ComponentFunc,
134        instance_name: &str,
135        name: &str,
136    ) -> impl Future<
137        Output = anyhow::Result<
138            impl Stream<
139                    Item = anyhow::Result<(
140                        Self::Context,
141                        Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
142                    )>,
143                > + Send
144                + 'static,
145        >,
146    > + Send
147    where
148        T: WasiView + WrpcView + 'static,
149    {
150        let span = Span::current();
151        let guest_resources = guest_resources.into();
152        let host_resources = host_resources.into();
153        let io_stream_resources = io_stream_resources.into();
154        async move {
155            let func = {
156                let mut store = store.lock().await;
157                let idx = if instance_name.is_empty() {
158                    None
159                } else {
160                    let idx = instance
161                        .get_export_index(store.as_context_mut(), None, instance_name)
162                        .with_context(|| format!("export `{instance_name}` not found"))?;
163                    Some(idx)
164                };
165                let idx = instance
166                    .get_export_index(store.as_context_mut(), idx.as_ref(), name)
167                    .with_context(|| format!("export `{name}` not found"))?;
168                instance.get_func(store.as_context_mut(), idx)
169            }
170            .with_context(|| format!("function export `{name}` not found"))?;
171            debug!(instance = instance_name, name, "serving function export");
172            let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
173            let paths = crate::paths::params_async_paths(params_ty.iter(), &io_stream_resources);
174            let invocations = self
175                .serve(instance_name, rpc_func_name(name), paths)
176                .await?;
177            let results_ty: Arc<[_]> = ty.results().collect();
178            let guest_resources = Arc::clone(&guest_resources);
179            let host_resources = Arc::clone(&host_resources);
180            Ok(invocations.map_ok(move |(cx, tx, rx)| {
181                let params_ty = Arc::clone(&params_ty);
182                let results_ty = Arc::clone(&results_ty);
183                let guest_resources = Arc::clone(&guest_resources);
184                let host_resources = Arc::clone(&host_resources);
185                let io_stream_resources = Arc::clone(&io_stream_resources);
186                let store = Arc::clone(&store);
187                (
188                    cx,
189                    Box::pin(
190                        async move {
191                            let mut store = store.lock().await;
192                            call(
193                                &mut *store,
194                                rx,
195                                tx,
196                                &guest_resources,
197                                &host_resources,
198                                &io_stream_resources,
199                                params_ty.iter(),
200                                &results_ty,
201                                func,
202                            )
203                            .await?;
204                            Ok(())
205                        }
206                        .instrument(span.clone()),
207                    ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
208                )
209            }))
210        }
211    }
212}
213
214impl<T: wrpc_transport::Serve> ServeExt for T {}