1use core::future::Future;
2use core::pin::Pin;
3
4use std::{collections::HashMap, sync::Arc};
5
6use anyhow::Context as _;
7use futures::{Stream, TryStreamExt as _};
8use tokio::sync::Mutex;
9use tracing::{debug, instrument, Instrument as _, Span};
10use wasmtime::component::types;
11use wasmtime::component::{Instance, InstancePre, ResourceType};
12use wasmtime::AsContextMut;
13use wasmtime_wasi::WasiView;
14
15use crate::{call, rpc_func_name, WrpcView};
16
17pub trait ServeExt: wrpc_transport::Serve {
18 #[instrument(level = "trace", skip(self, store, instance_pre, host_resources))]
21 fn serve_function<T>(
22 &self,
23 store: impl Fn() -> wasmtime::Store<T> + Send + 'static,
24 instance_pre: InstancePre<T>,
25 host_resources: impl Into<
26 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
27 >,
28 ty: types::ComponentFunc,
29 instance_name: &str,
30 name: &str,
31 ) -> impl Future<
32 Output = anyhow::Result<
33 impl Stream<
34 Item = anyhow::Result<(
35 Self::Context,
36 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
37 )>,
38 > + Send
39 + 'static,
40 >,
41 > + Send
42 where
43 T: WasiView + WrpcView + 'static,
44 {
45 let span = Span::current();
46 let host_resources = host_resources.into();
47 async move {
48 debug!(instance = instance_name, name, "serving function export");
49 let component_ty = instance_pre.component();
50 let idx = if instance_name.is_empty() {
51 None
52 } else {
53 let idx = component_ty
54 .get_export_index(None, instance_name)
55 .with_context(|| format!("export `{instance_name}` not found"))?;
56 Some(idx)
57 };
58 let idx = component_ty
59 .get_export_index(idx.as_ref(), name)
60 .with_context(|| format!("export `{name}` not found"))?;
61
62 let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
63 let io_streams: Arc<[ResourceType]> = crate::paths::wasi_io_stream_resources(
64 component_ty.engine(),
65 &component_ty.component_type(),
66 )
67 .into();
68 let paths = crate::paths::params_async_paths(params_ty.iter(), &io_streams);
69 let invocations = self
70 .serve(instance_name, rpc_func_name(name), paths)
71 .await?;
72 let name = Arc::<str>::from(name);
73 let results_ty: Arc<[_]> = ty.results().collect();
74 let host_resources = Arc::clone(&host_resources);
75 Ok(invocations.map_ok(move |(cx, tx, rx)| {
76 let instance_pre = instance_pre.clone();
77 let name = Arc::clone(&name);
78 let params_ty = Arc::clone(¶ms_ty);
79 let results_ty = Arc::clone(&results_ty);
80 let host_resources = Arc::clone(&host_resources);
81 let io_streams = Arc::clone(&io_streams);
82
83 let mut store = store();
84 (
85 cx,
86 Box::pin(
87 async move {
88 let instance = instance_pre
89 .instantiate_async(&mut store)
90 .await
91 .map_err(anyhow::Error::from)
92 .context("failed to instantiate component")?;
93 let func = instance
94 .get_func(&mut store, idx)
95 .with_context(|| format!("function export `{name}` not found"))?;
96 call(
97 &mut store,
98 rx,
99 tx,
100 &[],
101 &host_resources,
102 &io_streams,
103 params_ty.iter(),
104 &results_ty,
105 func,
106 )
107 .await?;
108 Ok(())
109 }
110 .instrument(span.clone()),
111 ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
112 )
113 }))
114 }
115 }
116
117 #[instrument(
120 level = "trace",
121 skip(self, store, instance, guest_resources, host_resources)
122 )]
123 #[allow(clippy::too_many_arguments)]
124 fn serve_function_shared<T>(
125 &self,
126 store: Arc<Mutex<wasmtime::Store<T>>>,
127 instance: Instance,
128 guest_resources: impl Into<Arc<[ResourceType]>>,
129 host_resources: impl Into<
130 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
131 >,
132 io_stream_resources: impl Into<Arc<[ResourceType]>>,
133 ty: types::ComponentFunc,
134 instance_name: &str,
135 name: &str,
136 ) -> impl Future<
137 Output = anyhow::Result<
138 impl Stream<
139 Item = anyhow::Result<(
140 Self::Context,
141 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
142 )>,
143 > + Send
144 + 'static,
145 >,
146 > + Send
147 where
148 T: WasiView + WrpcView + 'static,
149 {
150 let span = Span::current();
151 let guest_resources = guest_resources.into();
152 let host_resources = host_resources.into();
153 let io_stream_resources = io_stream_resources.into();
154 async move {
155 let func = {
156 let mut store = store.lock().await;
157 let idx = if instance_name.is_empty() {
158 None
159 } else {
160 let idx = instance
161 .get_export_index(store.as_context_mut(), None, instance_name)
162 .with_context(|| format!("export `{instance_name}` not found"))?;
163 Some(idx)
164 };
165 let idx = instance
166 .get_export_index(store.as_context_mut(), idx.as_ref(), name)
167 .with_context(|| format!("export `{name}` not found"))?;
168 instance.get_func(store.as_context_mut(), idx)
169 }
170 .with_context(|| format!("function export `{name}` not found"))?;
171 debug!(instance = instance_name, name, "serving function export");
172 let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
173 let paths = crate::paths::params_async_paths(params_ty.iter(), &io_stream_resources);
174 let invocations = self
175 .serve(instance_name, rpc_func_name(name), paths)
176 .await?;
177 let results_ty: Arc<[_]> = ty.results().collect();
178 let guest_resources = Arc::clone(&guest_resources);
179 let host_resources = Arc::clone(&host_resources);
180 Ok(invocations.map_ok(move |(cx, tx, rx)| {
181 let params_ty = Arc::clone(¶ms_ty);
182 let results_ty = Arc::clone(&results_ty);
183 let guest_resources = Arc::clone(&guest_resources);
184 let host_resources = Arc::clone(&host_resources);
185 let io_stream_resources = Arc::clone(&io_stream_resources);
186 let store = Arc::clone(&store);
187 (
188 cx,
189 Box::pin(
190 async move {
191 let mut store = store.lock().await;
192 call(
193 &mut *store,
194 rx,
195 tx,
196 &guest_resources,
197 &host_resources,
198 &io_stream_resources,
199 params_ty.iter(),
200 &results_ty,
201 func,
202 )
203 .await?;
204 Ok(())
205 }
206 .instrument(span.clone()),
207 ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
208 )
209 }))
210 }
211 }
212}
213
214impl<T: wrpc_transport::Serve> ServeExt for T {}