1use std::{
2 convert::TryFrom,
3 future::{Future, ready},
4 marker::PhantomData,
5 sync::Arc,
6};
7
8use selium_abi::{
9 AbiParam, AbiScalarType, AbiScalarValue, AbiValue, EntrypointArg, EntrypointInvocation,
10 GuestResourceId, ProcessLogLookup, ProcessLogRegistration, ProcessStart,
11};
12use tracing::debug;
13use wasmtime::Caller;
14
15use crate::{
16 KernelError,
17 drivers::Capability,
18 guest_data::{GuestError, GuestResult},
19 operation::{Contract, Operation},
20 registry::{
21 InstanceRegistry, ProcessIdentity, Registry, ResourceHandle, ResourceId, ResourceType,
22 },
23};
24
25type ProcessLifecycleOps<C> = (
26 Arc<Operation<ProcessStartDriver<C>>>,
27 Arc<Operation<ProcessStopDriver<C>>>,
28);
29
30type ProcessLogOps<C> = (
31 Arc<Operation<ProcessRegisterLogDriver<C>>>,
32 Arc<Operation<ProcessLogLookupDriver<C>>>,
33);
34
35pub trait ProcessLifecycleCapability {
37 type Process: Send;
38 type Error: Into<GuestError>;
39
40 fn start(
42 &self,
43 registry: &Arc<Registry>,
44 process_id: ResourceId,
45 module_id: &str,
46 name: &str,
47 capabilities: Vec<Capability>,
48 entrypoint: EntrypointInvocation,
49 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
50
51 fn stop(
53 &self,
54 instance: &mut Self::Process,
55 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
56}
57
58pub struct ProcessStartDriver<Impl>(Impl);
60pub struct ProcessStopDriver<Impl>(Impl);
62pub struct ProcessRegisterLogDriver<Impl>(PhantomData<Impl>);
64pub struct ProcessLogLookupDriver<Impl>(PhantomData<Impl>);
66
67impl<T> ProcessLifecycleCapability for Arc<T>
68where
69 T: ProcessLifecycleCapability,
70{
71 type Process = T::Process;
72 type Error = T::Error;
73
74 fn start(
75 &self,
76 registry: &Arc<Registry>,
77 process_id: ResourceId,
78 module_id: &str,
79 name: &str,
80 capabilities: Vec<Capability>,
81 entrypoint: EntrypointInvocation,
82 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
83 self.as_ref().start(
84 registry,
85 process_id,
86 module_id,
87 name,
88 capabilities,
89 entrypoint,
90 )
91 }
92
93 fn stop(
94 &self,
95 instance: &mut Self::Process,
96 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
97 self.as_ref().stop(instance)
98 }
99}
100
101impl<Impl> Contract for ProcessStartDriver<Impl>
102where
103 Impl: ProcessLifecycleCapability + Clone + Send + 'static,
104{
105 type Input = ProcessStart;
106 type Output = GuestResourceId;
107
108 fn to_future(
109 &self,
110 caller: &mut Caller<'_, InstanceRegistry>,
111 input: Self::Input,
112 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
113 let inner = self.0.clone();
114 let registry = caller.data().registry_arc();
115 let ProcessStart {
116 module_id,
117 name,
118 capabilities,
119 entrypoint,
120 } = input;
121
122 let preparation =
123 (|| -> GuestResult<(String, String, Vec<Capability>, EntrypointInvocation)> {
124 entrypoint
125 .validate()
126 .map_err(|err| GuestError::from(KernelError::Driver(err.to_string())))?;
127 let entrypoint = resolve_entrypoint_resources(entrypoint, caller.data())?;
128 Ok((module_id, name, capabilities, entrypoint))
129 })();
130
131 async move {
132 let (module_id, name, capabilities, entrypoint) = preparation?;
133 debug!(%module_id, %name, capabilities = ?capabilities, "process_start requested");
134 let process_id = registry
135 .reserve(None, ResourceType::Process)
136 .map_err(GuestError::from)?;
137
138 match inner
139 .start(
140 ®istry,
141 process_id,
142 &module_id,
143 &name,
144 capabilities,
145 entrypoint,
146 )
147 .await
148 {
149 Ok(()) => {}
150 Err(err) => {
151 registry.discard(process_id);
152 return Err(err.into());
153 }
154 }
155
156 let handle = GuestResourceId::try_from(process_id)
157 .map_err(|_| GuestError::from(KernelError::InvalidHandle))?;
158 Ok(handle)
159 }
160 }
161}
162
163impl<Impl> Contract for ProcessStopDriver<Impl>
164where
165 Impl: ProcessLifecycleCapability + Clone + Send + 'static,
166{
167 type Input = GuestResourceId;
168 type Output = ();
169
170 fn to_future(
171 &self,
172 caller: &mut Caller<'_, InstanceRegistry>,
173 input: Self::Input,
174 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
175 let inner = self.0.clone();
176 let registry = caller.data().registry_arc();
177
178 async move {
179 let handle = ResourceId::try_from(input).map_err(|_| GuestError::InvalidArgument)?;
180 if let Some(meta) = registry.metadata(handle)
181 && meta.kind != ResourceType::Process
182 {
183 return Err(GuestError::InvalidArgument);
184 }
185 let mut process = registry
186 .remove(ResourceHandle::<Impl::Process>::new(handle))
187 .ok_or(GuestError::NotFound)?;
188 inner.stop(&mut process).await.map_err(Into::into)?;
189 Ok(())
190 }
191 }
192}
193
194impl<Impl> Contract for ProcessRegisterLogDriver<Impl>
195where
196 Impl: ProcessLifecycleCapability + Clone + Send + 'static,
197{
198 type Input = ProcessLogRegistration;
199 type Output = ();
200
201 fn to_future(
202 &self,
203 caller: &mut Caller<'_, InstanceRegistry>,
204 input: Self::Input,
205 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
206 let identity = caller
207 .data()
208 .extension::<ProcessIdentity>()
209 .map(|identity| *identity);
210 let registry = caller.data().registry_arc();
211
212 ready((|| -> GuestResult<Self::Output> {
213 let identity = identity.ok_or(GuestError::PermissionDenied)?;
214 let process_id = identity.raw();
215 let channel_id = registry
216 .resolve_shared(input.channel)
217 .ok_or(GuestError::NotFound)?;
218
219 match registry.metadata(process_id) {
220 Some(meta) if meta.kind == ResourceType::Process => {}
221 Some(_) => return Err(GuestError::InvalidArgument),
222 None => return Err(GuestError::NotFound),
223 }
224
225 match registry.metadata(channel_id) {
226 Some(meta) if meta.kind == ResourceType::Channel => {}
227 Some(_) => return Err(GuestError::InvalidArgument),
228 None => return Err(GuestError::NotFound),
229 }
230
231 registry
232 .set_log_channel(process_id, channel_id)
233 .map_err(GuestError::from)?;
234 Ok(())
235 })())
236 }
237}
238
239impl<Impl> Contract for ProcessLogLookupDriver<Impl>
240where
241 Impl: ProcessLifecycleCapability + Clone + Send + 'static,
242{
243 type Input = ProcessLogLookup;
244 type Output = GuestResourceId;
245
246 fn to_future(
247 &self,
248 caller: &mut Caller<'_, InstanceRegistry>,
249 input: Self::Input,
250 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
251 let registry = caller.data().registry_arc();
252
253 ready(
254 ResourceId::try_from(input.process_id)
255 .map_err(|_| GuestError::InvalidArgument)
256 .and_then(|id| match registry.metadata(id) {
257 Some(meta) if meta.kind == ResourceType::Process => {
258 registry.log_channel_handle(id).ok_or(GuestError::NotFound)
259 }
260 Some(_) => Err(GuestError::InvalidArgument),
261 None => Err(GuestError::NotFound),
262 }),
263 )
264 }
265}
266
267pub trait EntrypointInvocationExt {
269 fn materialise_values(
270 &self,
271 registry: &mut InstanceRegistry,
272 ) -> Result<Vec<AbiValue>, KernelError>;
273}
274
275impl EntrypointInvocationExt for EntrypointInvocation {
276 fn materialise_values(
277 &self,
278 registry: &mut InstanceRegistry,
279 ) -> Result<Vec<AbiValue>, KernelError> {
280 let mut values = Vec::with_capacity(self.args.len());
281
282 for (index, (param, arg)) in self
283 .signature
284 .params()
285 .iter()
286 .zip(self.args.iter())
287 .enumerate()
288 {
289 match (param, arg) {
290 (AbiParam::Scalar(_), EntrypointArg::Scalar(value)) => {
291 values.push(AbiValue::Scalar(*value));
292 }
293 (AbiParam::Scalar(AbiScalarType::I32), EntrypointArg::Resource(resource_id)) => {
294 let resource =
295 ResourceId::try_from(*resource_id).map_err(KernelError::IntConvert)?;
296 let slot = registry.insert_id(resource).map_err(KernelError::from)?;
297 let slot = i32::try_from(slot).map_err(KernelError::IntConvert)?;
298 values.push(AbiValue::Scalar(AbiScalarValue::I32(slot)));
299 }
300 (AbiParam::Scalar(AbiScalarType::U64), EntrypointArg::Resource(resource_id)) => {
301 if registry.registry().resolve_shared(*resource_id).is_none() {
302 return Err(KernelError::Driver(format!(
303 "argument {index} references unknown shared resource"
304 )));
305 }
306 values.push(AbiValue::Scalar(AbiScalarValue::U64(*resource_id)));
307 }
308 (AbiParam::Buffer, EntrypointArg::Buffer(bytes)) => {
309 values.push(AbiValue::Buffer(bytes.clone()));
310 }
311 _ => {
312 return Err(KernelError::Driver(format!(
313 "argument {index} incompatible with signature"
314 )));
315 }
316 }
317 }
318
319 Ok(values)
320 }
321}
322
323fn resolve_entrypoint_resources(
324 entrypoint: EntrypointInvocation,
325 registry: &InstanceRegistry,
326) -> GuestResult<EntrypointInvocation> {
327 let signature = entrypoint.signature;
328 let mut resolved = Vec::with_capacity(entrypoint.args.len());
329
330 for (index, (param, arg)) in signature
331 .params()
332 .iter()
333 .zip(entrypoint.args.into_iter())
334 .enumerate()
335 {
336 let arg = match (param, arg) {
337 (AbiParam::Scalar(AbiScalarType::I32), EntrypointArg::Resource(handle)) => {
338 let slot = usize::try_from(handle)
339 .map_err(|err| GuestError::from(KernelError::IntConvert(err)))?;
340 let rid = registry.entry(slot).ok_or(GuestError::NotFound)?;
341 let rid = GuestResourceId::try_from(rid).map_err(|_| {
342 GuestError::from(KernelError::Driver("invalid handle".to_string()))
343 })?;
344 EntrypointArg::Resource(rid)
345 }
346 (AbiParam::Scalar(AbiScalarType::U64), EntrypointArg::Resource(handle)) => {
347 if registry.registry().resolve_shared(handle).is_none() {
348 return Err(GuestError::from(KernelError::Driver(format!(
349 "argument {index} references unknown shared resource"
350 ))));
351 }
352 EntrypointArg::Resource(handle)
353 }
354 (_, arg) => arg,
355 };
356 resolved.push(arg);
357 }
358
359 Ok(EntrypointInvocation {
360 signature,
361 args: resolved,
362 })
363}
364
365pub fn lifecycle_ops<C>(cap: C) -> ProcessLifecycleOps<C>
367where
368 C: ProcessLifecycleCapability + Clone + Send + 'static,
369{
370 (
371 Operation::from_hostcall(
372 ProcessStartDriver(cap.clone()),
373 selium_abi::hostcall_contract!(PROCESS_START),
374 ),
375 Operation::from_hostcall(
376 ProcessStopDriver(cap),
377 selium_abi::hostcall_contract!(PROCESS_STOP),
378 ),
379 )
380}
381
382pub fn log_ops<C>() -> ProcessLogOps<C>
384where
385 C: ProcessLifecycleCapability + Clone + Send + 'static,
386{
387 (
388 Operation::from_hostcall(
389 ProcessRegisterLogDriver(PhantomData),
390 selium_abi::hostcall_contract!(PROCESS_REGISTER_LOG),
391 ),
392 Operation::from_hostcall(
393 ProcessLogLookupDriver(PhantomData),
394 selium_abi::hostcall_contract!(PROCESS_LOG_CHANNEL),
395 ),
396 )
397}