1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::sync::Arc;

use anyhow::Result;
use log::trace;
use tokio::task::JoinHandle;
use wasmtime::{ResourceLimiter, Val};

use crate::env::Environment;
use crate::runtimes::wasmtime::{WasmtimeCompiledModule, WasmtimeRuntime};
use crate::state::ProcessState;
use crate::{Process, Signal, WasmProcess};

/// Spawns a new wasm process from a compiled module.
///
/// A `Process` is created from a `module`, entry `function`, array of arguments and config. The
/// configuration will define some characteristics of the process, such as maximum memory, fuel
/// and host function properties (filesystem access, networking, ..).
///
/// After it's spawned the process will keep running in the background. A process can be killed
/// with `Signal::Kill` signal. If you would like to block until the process is finished you can
/// `.await` on the returned `JoinHandle<()>`.
impl Environment {
    pub async fn spawn_wasm<S>(
        &self,
        runtime: WasmtimeRuntime,
        module: WasmtimeCompiledModule<S>,
        state: S,
        function: &str,
        params: Vec<Val>,
        link: Option<(Option<i64>, Arc<dyn Process>)>,
        //) -> Result<(JoinHandle<anyhow::Error>, Arc<dyn Process>)>
    ) -> Result<(JoinHandle<Result<S>>, Arc<dyn Process>)>
    where
        S: ProcessState + Send + ResourceLimiter + 'static,
    {
        let id = state.id();
        trace!("Spawning process: {}", id);
        let signal_mailbox = state.signal_mailbox().clone();
        let message_mailbox = state.message_mailbox().clone();

        let instance = runtime.instantiate(&module, state).await?;
        let function = function.to_string();
        let fut = async move { instance.call(&function, params).await };
        let child_process = crate::new(fut, id, self.clone(), signal_mailbox.1, message_mailbox);
        let child_process_handle = Arc::new(WasmProcess::new(id, signal_mailbox.0.clone()));

        self.add_process(id, child_process_handle.clone());

        // **Child link guarantees**:
        // The link signal is going to be put inside of the child's mailbox and is going to be
        // processed before any child code can run. This means that any failure inside the child
        // Wasm code will be correctly reported to the parent.
        //
        // We assume here that the code inside of `process::new()` will not fail during signal
        // handling.
        //
        // **Parent link guarantees**:
        // A `tokio::task::yield_now()` call is executed to allow the parent to link the child
        // before continuing any further execution. This should force the parent to process all
        // signals right away.
        //
        // The parent could have received a `kill` signal in its mailbox before this function was
        // called and this signal is going to be processed before the link is established (FIFO).
        // Only after the yield function we can guarantee that the child is going to be notified
        // if the parent fails. This is ok, as the actual spawning of the child happens after the
        // call, so the child wouldn't even exist if the parent failed before.
        //
        // TODO: The guarantees provided here don't hold anymore in a distributed environment and
        //       will require some rethinking. This function will be executed on a completely
        //       different computer and needs to be synced in a more robust way with the parent
        //       running somewhere else.
        if let Some((tag, process)) = link {
            // Send signal to itself to perform the linking
            process.send(Signal::Link(None, child_process_handle.clone()));
            // Suspend itself to process all new signals
            tokio::task::yield_now().await;
            // Send signal to child to link it
            signal_mailbox
                .0
                .send(Signal::Link(tag, process))
                .expect("receiver must exist at this point");
        }

        // Spawn a background process
        trace!("Process size: {}", std::mem::size_of_val(&child_process));
        let join = tokio::task::spawn(child_process);
        Ok((join, child_process_handle))
    }
}