Skip to main content

selium_kernel/drivers/
process.rs

1use std::{
2    convert::TryFrom,
3    future::{Future, ready},
4    marker::PhantomData,
5    sync::Arc,
6};
7
8use selium_abi::{
9    AbiParam, AbiScalarType, AbiScalarValue, AbiValue, EntrypointArg, EntrypointInvocation,
10    GuestResourceId, ProcessLogLookup, ProcessLogRegistration, ProcessStart,
11};
12use tracing::debug;
13use wasmtime::Caller;
14
15use crate::{
16    KernelError,
17    drivers::Capability,
18    guest_data::{GuestError, GuestResult},
19    operation::{Contract, Operation},
20    registry::{
21        InstanceRegistry, ProcessIdentity, Registry, ResourceHandle, ResourceId, ResourceType,
22    },
23};
24
25type ProcessLifecycleOps<C> = (
26    Arc<Operation<ProcessStartDriver<C>>>,
27    Arc<Operation<ProcessStopDriver<C>>>,
28);
29
30type ProcessLogOps<C> = (
31    Arc<Operation<ProcessRegisterLogDriver<C>>>,
32    Arc<Operation<ProcessLogLookupDriver<C>>>,
33);
34
35/// Capability responsible for starting/stopping guest instances.
36pub trait ProcessLifecycleCapability {
37    type Process: Send;
38    type Error: Into<GuestError>;
39
40    /// Start a new process, identified by `module_id` and `name`
41    fn start(
42        &self,
43        registry: &Arc<Registry>,
44        process_id: ResourceId,
45        module_id: &str,
46        name: &str,
47        capabilities: Vec<Capability>,
48        entrypoint: EntrypointInvocation,
49    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
50
51    /// Stop a running process
52    fn stop(
53        &self,
54        instance: &mut Self::Process,
55    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
56}
57
58/// Hostcall driver that starts new processes.
59pub struct ProcessStartDriver<Impl>(Impl);
60/// Hostcall driver that stops running processes.
61pub struct ProcessStopDriver<Impl>(Impl);
62/// Hostcall driver that records the logging channel exported by a process.
63pub struct ProcessRegisterLogDriver<Impl>(PhantomData<Impl>);
64/// Hostcall driver that fetches the logging channel for a running process.
65pub struct ProcessLogLookupDriver<Impl>(PhantomData<Impl>);
66
67impl<T> ProcessLifecycleCapability for Arc<T>
68where
69    T: ProcessLifecycleCapability,
70{
71    type Process = T::Process;
72    type Error = T::Error;
73
74    fn start(
75        &self,
76        registry: &Arc<Registry>,
77        process_id: ResourceId,
78        module_id: &str,
79        name: &str,
80        capabilities: Vec<Capability>,
81        entrypoint: EntrypointInvocation,
82    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
83        self.as_ref().start(
84            registry,
85            process_id,
86            module_id,
87            name,
88            capabilities,
89            entrypoint,
90        )
91    }
92
93    fn stop(
94        &self,
95        instance: &mut Self::Process,
96    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
97        self.as_ref().stop(instance)
98    }
99}
100
101impl<Impl> Contract for ProcessStartDriver<Impl>
102where
103    Impl: ProcessLifecycleCapability + Clone + Send + 'static,
104{
105    type Input = ProcessStart;
106    type Output = GuestResourceId;
107
108    fn to_future(
109        &self,
110        caller: &mut Caller<'_, InstanceRegistry>,
111        input: Self::Input,
112    ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
113        let inner = self.0.clone();
114        let registry = caller.data().registry_arc();
115        let ProcessStart {
116            module_id,
117            name,
118            capabilities,
119            entrypoint,
120        } = input;
121
122        let preparation =
123            (|| -> GuestResult<(String, String, Vec<Capability>, EntrypointInvocation)> {
124                entrypoint
125                    .validate()
126                    .map_err(|err| GuestError::from(KernelError::Driver(err.to_string())))?;
127                let entrypoint = resolve_entrypoint_resources(entrypoint, caller.data())?;
128                Ok((module_id, name, capabilities, entrypoint))
129            })();
130
131        async move {
132            let (module_id, name, capabilities, entrypoint) = preparation?;
133            debug!(%module_id, %name, capabilities = ?capabilities, "process_start requested");
134            let process_id = registry
135                .reserve(None, ResourceType::Process)
136                .map_err(GuestError::from)?;
137
138            match inner
139                .start(
140                    &registry,
141                    process_id,
142                    &module_id,
143                    &name,
144                    capabilities,
145                    entrypoint,
146                )
147                .await
148            {
149                Ok(()) => {}
150                Err(err) => {
151                    registry.discard(process_id);
152                    return Err(err.into());
153                }
154            }
155
156            let handle = GuestResourceId::try_from(process_id)
157                .map_err(|_| GuestError::from(KernelError::InvalidHandle))?;
158            Ok(handle)
159        }
160    }
161}
162
163impl<Impl> Contract for ProcessStopDriver<Impl>
164where
165    Impl: ProcessLifecycleCapability + Clone + Send + 'static,
166{
167    type Input = GuestResourceId;
168    type Output = ();
169
170    fn to_future(
171        &self,
172        caller: &mut Caller<'_, InstanceRegistry>,
173        input: Self::Input,
174    ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
175        let inner = self.0.clone();
176        let registry = caller.data().registry_arc();
177
178        async move {
179            let handle = ResourceId::try_from(input).map_err(|_| GuestError::InvalidArgument)?;
180            if let Some(meta) = registry.metadata(handle)
181                && meta.kind != ResourceType::Process
182            {
183                return Err(GuestError::InvalidArgument);
184            }
185            let mut process = registry
186                .remove(ResourceHandle::<Impl::Process>::new(handle))
187                .ok_or(GuestError::NotFound)?;
188            inner.stop(&mut process).await.map_err(Into::into)?;
189            Ok(())
190        }
191    }
192}
193
194impl<Impl> Contract for ProcessRegisterLogDriver<Impl>
195where
196    Impl: ProcessLifecycleCapability + Clone + Send + 'static,
197{
198    type Input = ProcessLogRegistration;
199    type Output = ();
200
201    fn to_future(
202        &self,
203        caller: &mut Caller<'_, InstanceRegistry>,
204        input: Self::Input,
205    ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
206        let identity = caller
207            .data()
208            .extension::<ProcessIdentity>()
209            .map(|identity| *identity);
210        let registry = caller.data().registry_arc();
211
212        ready((|| -> GuestResult<Self::Output> {
213            let identity = identity.ok_or(GuestError::PermissionDenied)?;
214            let process_id = identity.raw();
215            let channel_id = registry
216                .resolve_shared(input.channel)
217                .ok_or(GuestError::NotFound)?;
218
219            match registry.metadata(process_id) {
220                Some(meta) if meta.kind == ResourceType::Process => {}
221                Some(_) => return Err(GuestError::InvalidArgument),
222                None => return Err(GuestError::NotFound),
223            }
224
225            match registry.metadata(channel_id) {
226                Some(meta) if meta.kind == ResourceType::Channel => {}
227                Some(_) => return Err(GuestError::InvalidArgument),
228                None => return Err(GuestError::NotFound),
229            }
230
231            registry
232                .set_log_channel(process_id, channel_id)
233                .map_err(GuestError::from)?;
234            Ok(())
235        })())
236    }
237}
238
239impl<Impl> Contract for ProcessLogLookupDriver<Impl>
240where
241    Impl: ProcessLifecycleCapability + Clone + Send + 'static,
242{
243    type Input = ProcessLogLookup;
244    type Output = GuestResourceId;
245
246    fn to_future(
247        &self,
248        caller: &mut Caller<'_, InstanceRegistry>,
249        input: Self::Input,
250    ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
251        let registry = caller.data().registry_arc();
252
253        ready(
254            ResourceId::try_from(input.process_id)
255                .map_err(|_| GuestError::InvalidArgument)
256                .and_then(|id| match registry.metadata(id) {
257                    Some(meta) if meta.kind == ResourceType::Process => {
258                        registry.log_channel_handle(id).ok_or(GuestError::NotFound)
259                    }
260                    Some(_) => Err(GuestError::InvalidArgument),
261                    None => Err(GuestError::NotFound),
262                }),
263        )
264    }
265}
266
267/// Helpers for working with entrypoint invocations inside the kernel.
268pub trait EntrypointInvocationExt {
269    fn materialise_values(
270        &self,
271        registry: &mut InstanceRegistry,
272    ) -> Result<Vec<AbiValue>, KernelError>;
273}
274
275impl EntrypointInvocationExt for EntrypointInvocation {
276    fn materialise_values(
277        &self,
278        registry: &mut InstanceRegistry,
279    ) -> Result<Vec<AbiValue>, KernelError> {
280        let mut values = Vec::with_capacity(self.args.len());
281
282        for (index, (param, arg)) in self
283            .signature
284            .params()
285            .iter()
286            .zip(self.args.iter())
287            .enumerate()
288        {
289            match (param, arg) {
290                (AbiParam::Scalar(_), EntrypointArg::Scalar(value)) => {
291                    values.push(AbiValue::Scalar(*value));
292                }
293                (AbiParam::Scalar(AbiScalarType::I32), EntrypointArg::Resource(resource_id)) => {
294                    let resource =
295                        ResourceId::try_from(*resource_id).map_err(KernelError::IntConvert)?;
296                    let slot = registry.insert_id(resource).map_err(KernelError::from)?;
297                    let slot = i32::try_from(slot).map_err(KernelError::IntConvert)?;
298                    values.push(AbiValue::Scalar(AbiScalarValue::I32(slot)));
299                }
300                (AbiParam::Scalar(AbiScalarType::U64), EntrypointArg::Resource(resource_id)) => {
301                    if registry.registry().resolve_shared(*resource_id).is_none() {
302                        return Err(KernelError::Driver(format!(
303                            "argument {index} references unknown shared resource"
304                        )));
305                    }
306                    values.push(AbiValue::Scalar(AbiScalarValue::U64(*resource_id)));
307                }
308                (AbiParam::Buffer, EntrypointArg::Buffer(bytes)) => {
309                    values.push(AbiValue::Buffer(bytes.clone()));
310                }
311                _ => {
312                    return Err(KernelError::Driver(format!(
313                        "argument {index} incompatible with signature"
314                    )));
315                }
316            }
317        }
318
319        Ok(values)
320    }
321}
322
323fn resolve_entrypoint_resources(
324    entrypoint: EntrypointInvocation,
325    registry: &InstanceRegistry,
326) -> GuestResult<EntrypointInvocation> {
327    let signature = entrypoint.signature;
328    let mut resolved = Vec::with_capacity(entrypoint.args.len());
329
330    for (index, (param, arg)) in signature
331        .params()
332        .iter()
333        .zip(entrypoint.args.into_iter())
334        .enumerate()
335    {
336        let arg = match (param, arg) {
337            (AbiParam::Scalar(AbiScalarType::I32), EntrypointArg::Resource(handle)) => {
338                let slot = usize::try_from(handle)
339                    .map_err(|err| GuestError::from(KernelError::IntConvert(err)))?;
340                let rid = registry.entry(slot).ok_or(GuestError::NotFound)?;
341                let rid = GuestResourceId::try_from(rid).map_err(|_| {
342                    GuestError::from(KernelError::Driver("invalid handle".to_string()))
343                })?;
344                EntrypointArg::Resource(rid)
345            }
346            (AbiParam::Scalar(AbiScalarType::U64), EntrypointArg::Resource(handle)) => {
347                if registry.registry().resolve_shared(handle).is_none() {
348                    return Err(GuestError::from(KernelError::Driver(format!(
349                        "argument {index} references unknown shared resource"
350                    ))));
351                }
352                EntrypointArg::Resource(handle)
353            }
354            (_, arg) => arg,
355        };
356        resolved.push(arg);
357    }
358
359    Ok(EntrypointInvocation {
360        signature,
361        args: resolved,
362    })
363}
364
365/// Build hostcall operations for process lifecycle management.
366pub fn lifecycle_ops<C>(cap: C) -> ProcessLifecycleOps<C>
367where
368    C: ProcessLifecycleCapability + Clone + Send + 'static,
369{
370    (
371        Operation::from_hostcall(
372            ProcessStartDriver(cap.clone()),
373            selium_abi::hostcall_contract!(PROCESS_START),
374        ),
375        Operation::from_hostcall(
376            ProcessStopDriver(cap),
377            selium_abi::hostcall_contract!(PROCESS_STOP),
378        ),
379    )
380}
381
382/// Build hostcall operations for process log channel metadata.
383pub fn log_ops<C>() -> ProcessLogOps<C>
384where
385    C: ProcessLifecycleCapability + Clone + Send + 'static,
386{
387    (
388        Operation::from_hostcall(
389            ProcessRegisterLogDriver(PhantomData),
390            selium_abi::hostcall_contract!(PROCESS_REGISTER_LOG),
391        ),
392        Operation::from_hostcall(
393            ProcessLogLookupDriver(PhantomData),
394            selium_abi::hostcall_contract!(PROCESS_LOG_CHANNEL),
395        ),
396    )
397}