lunatic_process_api/
lib.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    future::Future,
4    io::Write,
5    path::Path,
6    sync::Arc,
7    time::{Duration, Instant},
8};
9
10use anyhow::{anyhow, Result};
11use hash_map_id::HashMapId;
12use lunatic_common_api::{get_memory, IntoTrap};
13use lunatic_distributed::DistributedCtx;
14use lunatic_error_api::ErrorCtx;
15use lunatic_process::{
16    config::ProcessConfig,
17    env::Environment,
18    mailbox::MessageMailbox,
19    message::Message,
20    runtimes::{wasmtime::WasmtimeCompiledModule, RawWasm},
21    state::ProcessState,
22    DeathReason, Process, Signal, WasmProcess,
23};
24use lunatic_wasi_api::LunaticWasiCtx;
25use wasmtime::{Caller, Linker, ResourceLimiter, Val};
26
27pub type ProcessResources = HashMapId<Arc<dyn Process>>;
28pub type ModuleResources<S> = HashMapId<Arc<WasmtimeCompiledModule<S>>>;
29
30pub trait ProcessConfigCtx {
31    fn can_compile_modules(&self) -> bool;
32    fn set_can_compile_modules(&mut self, can: bool);
33    fn can_create_configs(&self) -> bool;
34    fn set_can_create_configs(&mut self, can: bool);
35    fn can_spawn_processes(&self) -> bool;
36    fn set_can_spawn_processes(&mut self, can: bool);
37    fn can_access_fs_location(&self, path: &Path) -> Result<(), String>;
38}
39
40pub trait ProcessCtx<S: ProcessState> {
41    fn mailbox(&mut self) -> &mut MessageMailbox;
42    fn message_scratch_area(&mut self) -> &mut Option<Message>;
43    fn module_resources(&self) -> &ModuleResources<S>;
44    fn module_resources_mut(&mut self) -> &mut ModuleResources<S>;
45    fn environment(&self) -> Arc<dyn Environment>;
46}
47
48// Register the process APIs to the linker
49pub fn register<T, E>(linker: &mut Linker<T>) -> Result<()>
50where
51    T: ProcessState
52        + ProcessCtx<T>
53        + DistributedCtx<E>
54        + ErrorCtx
55        + LunaticWasiCtx
56        + Send
57        + Sync
58        + ResourceLimiter
59        + 'static,
60    for<'a> &'a T: Send,
61    T::Config: ProcessConfigCtx,
62    E: Environment + 'static,
63{
64    #[cfg(feature = "metrics")]
65    lunatic_process::describe_metrics();
66
67    #[cfg(feature = "metrics")]
68    metrics::describe_counter!(
69        "lunatic.process.modules.compiled",
70        metrics::Unit::Count,
71        "number of modules compiled since startup"
72    );
73
74    #[cfg(feature = "metrics")]
75    metrics::describe_counter!(
76        "lunatic.process.modules.dropped",
77        metrics::Unit::Count,
78        "number of modules dropped since startup"
79    );
80
81    #[cfg(feature = "metrics")]
82    metrics::describe_gauge!(
83        "lunatic.process.modules.active",
84        metrics::Unit::Count,
85        "number of modules currently in memory"
86    );
87
88    #[cfg(feature = "metrics")]
89    metrics::describe_histogram!(
90        "lunatic.process.modules.compiled.duration",
91        metrics::Unit::Seconds,
92        "Duration of module compilation"
93    );
94
95    linker.func_wrap("lunatic::process", "compile_module", compile_module)?;
96    linker.func_wrap("lunatic::process", "drop_module", drop_module)?;
97
98    #[cfg(feature = "metrics")]
99    metrics::describe_counter!(
100        "lunatic.process.configs.created",
101        metrics::Unit::Count,
102        "number of configs created since startup"
103    );
104
105    #[cfg(feature = "metrics")]
106    metrics::describe_counter!(
107        "lunatic.process.configs.dropped",
108        metrics::Unit::Count,
109        "number of configs dropped since startup"
110    );
111
112    #[cfg(feature = "metrics")]
113    metrics::describe_gauge!(
114        "lunatic.process.configs.active",
115        metrics::Unit::Count,
116        "number of configs currently in memory"
117    );
118
119    linker.func_wrap("lunatic::process", "create_config", create_config)?;
120    linker.func_wrap("lunatic::process", "drop_config", drop_config)?;
121    linker.func_wrap(
122        "lunatic::process",
123        "config_set_max_memory",
124        config_set_max_memory,
125    )?;
126    linker.func_wrap(
127        "lunatic::process",
128        "config_get_max_memory",
129        config_get_max_memory,
130    )?;
131    linker.func_wrap(
132        "lunatic::process",
133        "config_set_max_fuel",
134        config_set_max_fuel,
135    )?;
136    linker.func_wrap(
137        "lunatic::process",
138        "config_get_max_fuel",
139        config_get_max_fuel,
140    )?;
141    linker.func_wrap(
142        "lunatic::process",
143        "config_can_compile_modules",
144        config_can_compile_modules,
145    )?;
146    linker.func_wrap(
147        "lunatic::process",
148        "config_set_can_compile_modules",
149        config_set_can_compile_modules,
150    )?;
151    linker.func_wrap(
152        "lunatic::process",
153        "config_can_create_configs",
154        config_can_create_configs,
155    )?;
156    linker.func_wrap(
157        "lunatic::process",
158        "config_set_can_create_configs",
159        config_set_can_create_configs,
160    )?;
161    linker.func_wrap(
162        "lunatic::process",
163        "config_can_spawn_processes",
164        config_can_spawn_processes,
165    )?;
166    linker.func_wrap(
167        "lunatic::process",
168        "config_set_can_spawn_processes",
169        config_set_can_spawn_processes,
170    )?;
171
172    linker.func_wrap8_async("lunatic::process", "spawn", spawn)?;
173    linker.func_wrap11_async("lunatic::process", "get_or_spawn", get_or_spawn)?;
174    linker.func_wrap1_async("lunatic::process", "sleep_ms", sleep_ms)?;
175    linker.func_wrap("lunatic::process", "die_when_link_dies", die_when_link_dies)?;
176
177    linker.func_wrap("lunatic::process", "process_id", process_id)?;
178    linker.func_wrap("lunatic::process", "environment_id", environment_id)?;
179    linker.func_wrap("lunatic::process", "link", link)?;
180    linker.func_wrap("lunatic::process", "unlink", unlink)?;
181    linker.func_wrap("lunatic::process", "monitor", monitor)?;
182    linker.func_wrap("lunatic::process", "stop_monitoring", stop_monitoring)?;
183    linker.func_wrap("lunatic::process", "kill", kill)?;
184    linker.func_wrap("lunatic::process", "exists", exists)?;
185    Ok(())
186}
187
188// Compile a new WebAssembly module.
189//
190// The `spawn` function can be used to spawn new processes from the module.
191// Module compilation can be a CPU intensive task.
192//
193// Returns:
194// *  0 on success - The ID of the newly created module is written to **id_ptr**
195// *  1 on error   - The error ID is written to **id_ptr**
196// * -1 in case the process doesn't have permission to compile modules.
197fn compile_module<T>(
198    mut caller: Caller<T>,
199    module_data_ptr: u32,
200    module_data_len: u32,
201    id_ptr: u32,
202) -> Result<i32>
203where
204    T: ProcessState + ProcessCtx<T> + ErrorCtx,
205    T::Config: ProcessConfigCtx,
206{
207    // TODO: Module compilation is CPU intensive and should be done on the blocking task thread pool.
208    if !caller.data().config().can_compile_modules() {
209        return Ok(-1);
210    }
211
212    #[cfg(feature = "metrics")]
213    metrics::increment_counter!("lunatic.process.modules.compiled");
214
215    #[cfg(feature = "metrics")]
216    metrics::increment_gauge!("lunatic.process.modules.active", 1.0);
217
218    #[cfg(feature = "metrics")]
219    let start = Instant::now();
220
221    let mut module = vec![0; module_data_len as usize];
222    let memory = get_memory(&mut caller)?;
223    memory
224        .read(&caller, module_data_ptr as usize, module.as_mut_slice())
225        .or_trap("lunatic::process::compile_module")?;
226
227    let module = RawWasm::new(None, module);
228    let (mod_or_error_id, result) = match caller.data().runtime().compile_module(module) {
229        Ok(module) => (
230            caller
231                .data_mut()
232                .module_resources_mut()
233                .add(Arc::new(module)),
234            0,
235        ),
236        Err(error) => (caller.data_mut().error_resources_mut().add(error), 1),
237    };
238
239    #[cfg(feature = "metrics")]
240    let duration = Instant::now() - start;
241    #[cfg(feature = "metrics")]
242    metrics::histogram!("lunatic.process.modules.compiled.duration", duration);
243
244    memory
245        .write(&mut caller, id_ptr as usize, &mod_or_error_id.to_le_bytes())
246        .or_trap("lunatic::process::compile_module")?;
247    Ok(result)
248}
249
250// Drops the module from resources.
251//
252// Traps:
253// * If the module ID doesn't exist.
254fn drop_module<T: ProcessState + ProcessCtx<T>>(
255    mut caller: Caller<T>,
256    module_id: u64,
257) -> Result<()> {
258    #[cfg(feature = "metrics")]
259    metrics::increment_counter!("lunatic.process.modules.dropped");
260
261    #[cfg(feature = "metrics")]
262    metrics::decrement_gauge!("lunatic.process.modules.active", 1.0);
263
264    caller
265        .data_mut()
266        .module_resources_mut()
267        .remove(module_id)
268        .or_trap("lunatic::process::drop_module: Module ID doesn't exist")?;
269    Ok(())
270}
271
272// Create a new configuration with all permissions denied.
273//
274// There is no memory or fuel limit set on the newly created configuration.
275//
276// Returns:
277// * ID of newly created configuration in case of success
278// * -1 in case the process doesn't have permission to create new configurations
279fn create_config<T>(mut caller: Caller<T>) -> i64
280where
281    T: ProcessState + ProcessCtx<T>,
282    T::Config: ProcessConfigCtx,
283{
284    if !caller.data().config().can_create_configs() {
285        return -1;
286    }
287    let config = T::Config::default();
288    #[cfg(feature = "metrics")]
289    metrics::increment_counter!("lunatic.process.configs.created");
290    #[cfg(feature = "metrics")]
291    metrics::increment_gauge!("lunatic.process.configs.active", 1.0);
292    caller.data_mut().config_resources_mut().add(config) as i64
293}
294
295// Drops the configuration from resources.
296//
297// Traps:
298// * If the config ID doesn't exist.
299fn drop_config<T: ProcessState + ProcessCtx<T>>(
300    mut caller: Caller<T>,
301    config_id: u64,
302) -> Result<()> {
303    caller
304        .data_mut()
305        .config_resources_mut()
306        .remove(config_id)
307        .or_trap("lunatic::process::drop_config: Config ID doesn't exist")?;
308    #[cfg(feature = "metrics")]
309    metrics::increment_counter!("lunatic.process.configs.dropped");
310    #[cfg(feature = "metrics")]
311    metrics::decrement_gauge!("lunatic.process.configs.active", 1.0);
312    Ok(())
313}
314
315// Sets the memory limit on a configuration.
316//
317// Traps:
318// * If max_memory is bigger than the platform maximum.
319// * If the config ID doesn't exist.
320fn config_set_max_memory<T: ProcessState + ProcessCtx<T>>(
321    mut caller: Caller<T>,
322    config_id: u64,
323    max_memory: u64,
324) -> Result<()> {
325    let max_memory = usize::try_from(max_memory)
326        .or_trap("lunatic::process::config_set_max_memory: max_memory exceeds platform max")?;
327    caller
328        .data_mut()
329        .config_resources_mut()
330        .get_mut(config_id)
331        .or_trap("lunatic::process::config_set_max_memory: Config ID doesn't exist")?
332        .set_max_memory(max_memory);
333    Ok(())
334}
335
336// Returns the memory limit of a configuration.
337//
338// Traps:
339// * If the config ID doesn't exist.
340fn config_get_max_memory<T: ProcessState + ProcessCtx<T>>(
341    caller: Caller<T>,
342    config_id: u64,
343) -> Result<u64> {
344    let max_memory = caller
345        .data()
346        .config_resources()
347        .get(config_id)
348        .or_trap("lunatic::process::config_get_max_memory: Config ID doesn't exist")?
349        .get_max_memory();
350    Ok(max_memory as u64)
351}
352
353// Sets the fuel limit on a configuration.
354//
355// A value of 0 indicates no fuel limit.
356//
357// Traps:
358// * If the config ID doesn't exist.
359fn config_set_max_fuel<T: ProcessState + ProcessCtx<T>>(
360    mut caller: Caller<T>,
361    config_id: u64,
362    max_fuel: u64,
363) -> Result<()> {
364    let max_fuel = match max_fuel {
365        0 => None,
366        max_fuel => Some(max_fuel),
367    };
368
369    caller
370        .data_mut()
371        .config_resources_mut()
372        .get_mut(config_id)
373        .or_trap("lunatic::process::config_set_max_fuel: Config ID doesn't exist")?
374        .set_max_fuel(max_fuel);
375    Ok(())
376}
377
378// Returns the fuel limit of a configuration.
379//
380// A value of 0 indicates no fuel limit.
381//
382// Traps:
383// * If the config ID doesn't exist.
384fn config_get_max_fuel<T: ProcessState + ProcessCtx<T>>(
385    caller: Caller<T>,
386    config_id: u64,
387) -> Result<u64> {
388    let max_fuel = caller
389        .data()
390        .config_resources()
391        .get(config_id)
392        .or_trap("lunatic::process::config_get_max_fuel: Config ID doesn't exist")?
393        .get_max_fuel();
394    match max_fuel {
395        None => Ok(0),
396        Some(max_fuel) => Ok(max_fuel),
397    }
398}
399
400// Returns 1 if processes spawned from this configuration can compile Wasm modules, otherwise 0.
401//
402// Traps:
403// * If the config ID doesn't exist.
404fn config_can_compile_modules<T>(caller: Caller<T>, config_id: u64) -> Result<u32>
405where
406    T: ProcessState + ProcessCtx<T>,
407    T::Config: ProcessConfigCtx,
408{
409    let can = caller
410        .data()
411        .config_resources()
412        .get(config_id)
413        .or_trap("lunatic::process::config_can_compile_modules: Config ID doesn't exist")?
414        .can_compile_modules();
415    Ok(can as u32)
416}
417
418// If set to a value >0 (true), processes spawned from this configuration will be able to compile
419// Wasm modules.
420//
421// Traps:
422// * If the config ID doesn't exist.
423fn config_set_can_compile_modules<T>(mut caller: Caller<T>, config_id: u64, can: u32) -> Result<()>
424where
425    T: ProcessState + ProcessCtx<T>,
426    T::Config: ProcessConfigCtx,
427{
428    caller
429        .data_mut()
430        .config_resources_mut()
431        .get_mut(config_id)
432        .or_trap("lunatic::process::config_set_can_compile_modules: Config ID doesn't exist")?
433        .set_can_compile_modules(can != 0);
434    Ok(())
435}
436
437// Returns 1 if processes spawned from this configuration can create other configurations,
438// otherwise 0.
439//
440// Traps:
441// * If the config ID doesn't exist.
442fn config_can_create_configs<T>(caller: Caller<T>, config_id: u64) -> Result<u32>
443where
444    T: ProcessState + ProcessCtx<T>,
445    T::Config: ProcessConfigCtx,
446{
447    let can = caller
448        .data()
449        .config_resources()
450        .get(config_id)
451        .or_trap("lunatic::process::config_can_create_configs: Config ID doesn't exist")?
452        .can_create_configs();
453    Ok(can as u32)
454}
455
456// If set to a value >0 (true), processes spawned from this configuration will be able to create
457// other configuration.
458//
459// Traps:
460// * If the config ID doesn't exist.
461fn config_set_can_create_configs<T>(mut caller: Caller<T>, config_id: u64, can: u32) -> Result<()>
462where
463    T: ProcessState + ProcessCtx<T>,
464    T::Config: ProcessConfigCtx,
465{
466    caller
467        .data_mut()
468        .config_resources_mut()
469        .get_mut(config_id)
470        .or_trap("lunatic::process::config_set_can_create_configs: Config ID doesn't exist")?
471        .set_can_create_configs(can != 0);
472    Ok(())
473}
474
475// Returns 1 if processes spawned from this configuration can spawn sub-processes, otherwise 0.
476//
477// Traps:
478// * If the config ID doesn't exist.
479fn config_can_spawn_processes<T>(caller: Caller<T>, config_id: u64) -> Result<u32>
480where
481    T: ProcessState + ProcessCtx<T>,
482    T::Config: ProcessConfigCtx,
483{
484    let can = caller
485        .data()
486        .config_resources()
487        .get(config_id)
488        .or_trap("lunatic::process::config_can_spawn_processes: Config ID doesn't exist")?
489        .can_spawn_processes();
490    Ok(can as u32)
491}
492
493// If set to a value >0 (true), processes spawned from this configuration will be able to spawn
494// sub-processes.
495//
496// Traps:
497// * If the config ID doesn't exist.
498fn config_set_can_spawn_processes<T>(mut caller: Caller<T>, config_id: u64, can: u32) -> Result<()>
499where
500    T: ProcessState + ProcessCtx<T>,
501    T::Config: ProcessConfigCtx,
502{
503    caller
504        .data_mut()
505        .config_resources_mut()
506        .get_mut(config_id)
507        .or_trap("lunatic::process::config_set_can_spawn_processes: Config ID doesn't exist")?
508        .set_can_spawn_processes(can != 0);
509    Ok(())
510}
511
512// Spawns a new process using the passed in function inside a module as the entry point.
513//
514// If **link** is not 0, it will link the child and parent processes. The value of the **link**
515// argument will be used as the link-tag for the child. This means, if the child traps the parent
516// is going to get a signal back with the value used as the tag.
517//
518// If *config_id* or *module_id* have the value -1, the same module/config is used as in the
519// process calling this function.
520//
521// The function arguments are passed as an array with the following structure:
522// [0 byte = type ID; 1..17 bytes = value as u128, ...]
523// The type ID follows the WebAssembly binary convention:
524//  - 0x7F => i32
525//  - 0x7E => i64
526//  - 0x7B => v128
527// If any other value is used as type ID, this function will trap.
528//
529// Returns:
530// * 0 on success - The ID of the newly created process is written to **id_ptr**
531// * 1 on error   - The error ID is written to **id_ptr**
532//
533// Traps:
534// * If the module ID doesn't exist.
535// * If the function string is not a valid utf8 string.
536// * If the params array is in a wrong format.
537// * If any memory outside the guest heap space is referenced.
538#[allow(clippy::too_many_arguments)]
539fn spawn<T>(
540    mut caller: Caller<T>,
541    link: i64,
542    config_id: i64,
543    module_id: i64,
544    func_str_ptr: u32,
545    func_str_len: u32,
546    params_ptr: u32,
547    params_len: u32,
548    id_ptr: u32,
549) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
550where
551    T: ProcessState
552        + ProcessCtx<T>
553        + ErrorCtx
554        + LunaticWasiCtx
555        + ResourceLimiter
556        + Send
557        + Sync
558        + 'static,
559    for<'a> &'a T: Send,
560    T::Config: ProcessConfigCtx,
561{
562    Box::new(async move {
563        if !caller.data().config().can_spawn_processes() {
564            return Err(anyhow!(
565                "Process doesn't have permissions to spawn sub-processes"
566            ));
567        }
568
569        let env = caller.data().environment();
570        env.can_spawn_next_process()
571            .await
572            .or_trap("lunatic::process:spawn: Process spawn limit reached.")?;
573
574        let state = caller.data();
575
576        if !state.is_initialized() {
577            return Err(anyhow!("Cannot spawn process during module initialization"));
578        }
579
580        let config = match config_id {
581            -1 => state.config().clone(),
582            config_id => Arc::new(
583                caller
584                    .data()
585                    .config_resources()
586                    .get(config_id as u64)
587                    .or_trap("lunatic::process::spawn: Config ID doesn't exist")?
588                    .clone(),
589            ),
590        };
591
592        let module = match module_id {
593            -1 => state.module().clone(),
594            module_id => caller
595                .data()
596                .module_resources()
597                .get(module_id as u64)
598                .or_trap("lunatic::process::spawn: Module ID doesn't exist")?
599                .clone(),
600        };
601
602        let mut new_state = state.new_state(module.clone(), config)?;
603
604        let memory = get_memory(&mut caller)?;
605        let func_str = memory
606            .data(&caller)
607            .get(func_str_ptr as usize..(func_str_ptr + func_str_len) as usize)
608            .or_trap("lunatic::process::spawn")?;
609        let function = std::str::from_utf8(func_str).or_trap("lunatic::process::spawn")?;
610        let params = memory
611            .data(&caller)
612            .get(params_ptr as usize..(params_ptr + params_len) as usize)
613            .or_trap("lunatic::process::spawn")?;
614        let params_chunks = &mut params.chunks_exact(17);
615        let params = params_chunks
616            .map(|chunk| {
617                let value = u128::from_le_bytes(chunk[1..].try_into()?);
618                let result = match chunk[0] {
619                    0x7F => Val::I32(value as i32),
620                    0x7E => Val::I64(value as i64),
621                    0x7B => Val::V128(value),
622                    _ => return Err(anyhow!("Unsupported type ID")),
623                };
624                Ok(result)
625            })
626            .collect::<Result<Vec<_>>>()?;
627        if !params_chunks.remainder().is_empty() {
628            return Err(anyhow!(
629                "Params array must be in chunks of 17 bytes, but {} bytes remained",
630                params_chunks.remainder().len()
631            ));
632        }
633        // Should processes be linked together?
634        let link: Option<(Option<i64>, Arc<dyn Process>)> = match link {
635            0 => None,
636            tag => {
637                let id = caller.data().id();
638                let signal_mailbox = caller.data().signal_mailbox().clone();
639                let process = WasmProcess::new(id, signal_mailbox.0);
640                Some((Some(tag), Arc::new(process)))
641            }
642        };
643
644        let runtime = caller.data().runtime().clone();
645
646        // Inherit stdout and stderr streams if they are redirected by the parent.
647        let stdout = if let Some(stdout) = caller.data().get_stdout() {
648            let next_stream = stdout.next();
649            new_state.set_stdout(next_stream.clone());
650            Some((stdout.clone(), next_stream))
651        } else {
652            None
653        };
654        if let Some(stderr) = caller.data().get_stderr() {
655            // If stderr is same as stdout, use same `next_stream`.
656            if let Some((stdout, next_stream)) = stdout {
657                if &stdout == stderr {
658                    new_state.set_stderr(next_stream);
659                } else {
660                    new_state.set_stderr(stderr.next());
661                }
662            } else {
663                new_state.set_stderr(stderr.next());
664            }
665        }
666
667        // set state instead of config TODO
668        let env = caller.data().environment();
669        let (proc_or_error_id, result) = match lunatic_process::wasm::spawn_wasm(
670            env, runtime, &module, new_state, function, params, link,
671        )
672        .await
673        {
674            Ok((_, process)) => (process.id(), 0),
675            Err(error) => (caller.data_mut().error_resources_mut().add(error), 1),
676        };
677
678        memory
679            .write(caller, id_ptr as usize, &proc_or_error_id.to_le_bytes())
680            .or_trap("lunatic::process::spawn")?;
681        Ok(result)
682    })
683}
684
685// Looks up or spawns a new process.
686//
687// This function has a similar signature as `spawn`, but it first tries to look up a process in the registry
688// under `name`. If it exists returns it, if not spawns a new one and registers it under this name. This
689// operation is atomic. While a new process is being looked up and spawned, no other process can be inserted
690// into the registry under the same name.
691//
692// Different than spawn, the lookup can result in a process running on a different node. This means that the
693// node_id also needs to be returned through a pointer.
694//
695// Returns:
696// * 0 on success        - The ID of the newly created process is written to **id_ptr**
697// * 1 on error          - The error ID is written to **id_ptr**
698// * 2 on lookup success - The lookup found a process and the id is written to **id_ptr**
699//
700// Traps:
701// * If the name lookup string is not a valid utf8 string.
702// * If the module ID doesn't exist.
703// * If the function string is not a valid utf8 string.
704// * If the params array is in a wrong format.
705// * If any memory outside the guest heap space is referenced.
706#[allow(clippy::too_many_arguments)]
707fn get_or_spawn<T, E>(
708    mut caller: Caller<T>,
709    name_str_ptr: u32,
710    name_str_len: u32,
711    link: i64,
712    config_id: i64,
713    module_id: i64,
714    func_str_ptr: u32,
715    func_str_len: u32,
716    params_ptr: u32,
717    params_len: u32,
718    node_id_ptr: u32,
719    id_ptr: u32,
720) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
721where
722    T: ProcessState
723        + ProcessCtx<T>
724        + DistributedCtx<E>
725        + ErrorCtx
726        + LunaticWasiCtx
727        + ResourceLimiter
728        + Send
729        + Sync
730        + 'static,
731    for<'a> &'a T: Send,
732    T::Config: ProcessConfigCtx,
733    E: Environment,
734{
735    Box::new(async move {
736        let memory = get_memory(&mut caller)?;
737        let (memory_slice, state) = memory.data_and_store_mut(&mut caller);
738        let name = memory_slice
739            .get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize)
740            .or_trap("lunatic::process::get_or_spawn")?;
741        let name = std::str::from_utf8(name).or_trap("lunatic::process::get_or_spawn")?;
742
743        // Lock the registry for every other process before lookup.
744        let registry = state.registry().clone();
745        let mut registry = registry.write().await;
746        let process = registry.get(name).copied();
747
748        if let Some((node_id, process_id)) = process {
749            // Return the process from the registry.
750            memory_slice
751                .get_mut(node_id_ptr as usize..(node_id_ptr + 8) as usize)
752                .or_trap("lunatic::process::get_or_spawn")?
753                .write(&node_id.to_le_bytes())
754                .or_trap("lunatic::process::get_or_spawn")?;
755
756            memory_slice
757                .get_mut(id_ptr as usize..(id_ptr + 8) as usize)
758                .or_trap("lunatic::process::get_or_spawn")?
759                .write(&process_id.to_le_bytes())
760                .or_trap("lunatic::process::get_or_spawn")?;
761            Ok(2)
762        } else {
763            let name = name.to_owned();
764            // Spawn a new process. This is copy of the code in `spawn` because host functions can't call
765            // each other.
766            if !state.config().can_spawn_processes() {
767                return Err(anyhow!(
768                    "lunatic::process:get_or_spawn: Process doesn't have permissions to spawn sub-processes"
769                ));
770            }
771
772            let env = state.environment();
773            env.can_spawn_next_process()
774                .await
775                .or_trap("lunatic::process:get_or_spawn: Process spawn limit reached.")?;
776
777            if !state.is_initialized() {
778                return Err(
779                    anyhow!("lunatic::process:get_or_spawn: Cannot spawn process during module initialization")
780                );
781            }
782
783            let config = match config_id {
784                -1 => state.config().clone(),
785                config_id => Arc::new(
786                    state
787                        .config_resources()
788                        .get(config_id as u64)
789                        .or_trap("lunatic::process::get_or_spawn: Config ID doesn't exist")?
790                        .clone(),
791                ),
792            };
793
794            let module = match module_id {
795                -1 => state.module().clone(),
796                module_id => state
797                    .module_resources()
798                    .get(module_id as u64)
799                    .or_trap("lunatic::process::get_or_spawn: Module ID doesn't exist")?
800                    .clone(),
801            };
802
803            let mut new_state = state.new_state(module.clone(), config)?;
804
805            let func_str = memory_slice
806                .get(func_str_ptr as usize..(func_str_ptr + func_str_len) as usize)
807                .or_trap("lunatic::process::get_or_spawn")?;
808            let function =
809                std::str::from_utf8(func_str).or_trap("lunatic::process::get_or_spawn")?;
810            let params = memory_slice
811                .get(params_ptr as usize..(params_ptr + params_len) as usize)
812                .or_trap("lunatic::process::get_or_spawn")?;
813            let params_chunks = &mut params.chunks_exact(17);
814            let params = params_chunks
815                .map(|chunk| {
816                    let value = u128::from_le_bytes(chunk[1..].try_into()?);
817                    let result = match chunk[0] {
818                        0x7F => Val::I32(value as i32),
819                        0x7E => Val::I64(value as i64),
820                        0x7B => Val::V128(value),
821                        _ => return Err(anyhow!("Unsupported type ID")),
822                    };
823                    Ok(result)
824                })
825                .collect::<Result<Vec<_>>>()?;
826            if !params_chunks.remainder().is_empty() {
827                return Err(anyhow!(
828                    "Params array must be in chunks of 17 bytes, but {} bytes remained",
829                    params_chunks.remainder().len()
830                ));
831            }
832            // Should processes be linked together?
833            let link: Option<(Option<i64>, Arc<dyn Process>)> = match link {
834                0 => None,
835                tag => {
836                    let id = state.id();
837                    let signal_mailbox = state.signal_mailbox().clone();
838                    let process = WasmProcess::new(id, signal_mailbox.0);
839                    Some((Some(tag), Arc::new(process)))
840                }
841            };
842
843            let runtime = state.runtime().clone();
844
845            // Inherit stdout and stderr streams if they are redirected by the parent.
846            let stdout = if let Some(stdout) = state.get_stdout() {
847                let next_stream = stdout.next();
848                new_state.set_stdout(next_stream.clone());
849                Some((stdout.clone(), next_stream))
850            } else {
851                None
852            };
853            if let Some(stderr) = state.get_stderr() {
854                // If stderr is same as stdout, use same `next_stream`.
855                if let Some((stdout, next_stream)) = stdout {
856                    if &stdout == stderr {
857                        new_state.set_stderr(next_stream);
858                    } else {
859                        new_state.set_stderr(stderr.next());
860                    }
861                } else {
862                    new_state.set_stderr(stderr.next());
863                }
864            }
865
866            // set state instead of config TODO
867            let env = state.environment();
868            let (proc_or_error_id, result) = match lunatic_process::wasm::spawn_wasm(
869                env, runtime, &module, new_state, function, params, link,
870            )
871            .await
872            {
873                Ok((_, process)) => (process.id(), 0),
874                Err(error) => (state.error_resources_mut().add(error), 1),
875            };
876
877            let node_id = state
878                .distributed()
879                .as_ref()
880                .map(|d| d.node_id())
881                .unwrap_or(0);
882            memory_slice
883                .get_mut(node_id_ptr as usize..(node_id_ptr + 8) as usize)
884                .or_trap("lunatic::process::get_or_spawn")?
885                .write(&node_id.to_le_bytes())
886                .or_trap("lunatic::process::get_or_spawn")?;
887
888            memory_slice
889                .get_mut(id_ptr as usize..(id_ptr + 8) as usize)
890                .or_trap("lunatic::process::get_or_spawn")?
891                .write(&proc_or_error_id.to_le_bytes())
892                .or_trap("lunatic::process::get_or_spawn")?;
893
894            // Register newly spawned process under correct name
895            registry.insert(name, (node_id, proc_or_error_id));
896
897            Ok(result)
898        }
899    })
900}
901
902// lunatic::process::sleep_ms(millis: u64)
903//
904// Suspend process for `millis`.
905fn sleep_ms<T: ProcessState + ProcessCtx<T>>(
906    _: Caller<T>,
907    millis: u64,
908) -> Box<dyn Future<Output = ()> + Send + '_> {
909    Box::new(async move {
910        tokio::time::sleep(Duration::from_millis(millis)).await;
911    })
912}
913
914// Defines what happens to this process if one of the linked processes notifies us that it died.
915//
916// There are 2 options:
917// 1. `trap == 0` the received signal will be turned into a signal message and put into the mailbox.
918// 2. `trap != 0` the process will die and notify all linked processes of its death.
919//
920// The default behaviour for a newly spawned process is 2.
921fn die_when_link_dies<T: ProcessState + ProcessCtx<T>>(mut caller: Caller<T>, trap: u32) {
922    caller
923        .data_mut()
924        .signal_mailbox()
925        .0
926        .send(Signal::DieWhenLinkDies(trap != 0))
927        .expect("The signal is sent to itself and the receiver must exist at this point");
928}
929
930// Returns ID of the process currently running
931fn process_id<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>) -> u64 {
932    caller.data().id()
933}
934
935// Returns ID of the environment in which the process is currently running
936fn environment_id<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>) -> u64 {
937    caller.data().environment().id()
938}
939
940// Link current process to **process_id**. This is not an atomic operation, any of the 2 processes
941// could fail before processing the `Link` signal and may not notify the other.
942//
943// Traps:
944// * If the process ID doesn't exist.
945fn link<T: ProcessState + ProcessCtx<T>>(
946    mut caller: Caller<T>,
947    tag: i64,
948    process_id: u64,
949) -> Result<()> {
950    let tag = match tag {
951        0 => None,
952        tag => Some(tag),
953    };
954    // Create handle to itself
955    let id = caller.data().id();
956    let signal_mailbox = caller.data().signal_mailbox().clone();
957    let this_process = WasmProcess::new(id, signal_mailbox.0);
958
959    // Send link signal to other process
960    let process = caller.data().environment().get_process(process_id);
961
962    if let Some(process) = process {
963        process.send(Signal::Link(tag, Arc::new(this_process)));
964
965        // Send link signal to itself
966        caller
967            .data_mut()
968            .signal_mailbox()
969            .0
970            .send(Signal::Link(tag, process))
971            .expect("The Link signal is sent to itself and the receiver must exist at this point");
972    } else {
973        caller
974            .data_mut()
975            .signal_mailbox()
976            .0
977            .send(Signal::LinkDied(process_id, tag, DeathReason::NoProcess))
978            .expect(
979                "The LinkDied signal is sent to itself and the receiver must exist at this point",
980            );
981    }
982    Ok(())
983}
984
985// Unlink current process from **process_id**. This is not an atomic operation.
986//
987// Traps:
988// * If the process ID doesn't exist.
989fn unlink<T: ProcessState + ProcessCtx<T>>(mut caller: Caller<T>, process_id: u64) -> Result<()> {
990    // Create handle to itself
991    let this_process_id = caller.data().id();
992
993    // Send unlink signal to other process
994    let process = caller.data().environment().get_process(process_id);
995
996    if let Some(process) = process {
997        process.send(Signal::UnLink {
998            process_id: this_process_id,
999        });
1000    }
1001
1002    // Send unlink signal to itself
1003    caller
1004        .data_mut()
1005        .signal_mailbox()
1006        .0
1007        .send(Signal::UnLink { process_id })
1008        .expect("The signal is sent to itself and the receiver must exist at this point");
1009
1010    Ok(())
1011}
1012
1013// Start monitoring **process_id**. This is not an atomic operation.
1014//
1015// Traps:
1016// * If the process ID doesn't exist.
1017fn monitor<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>, process_id: u64) -> Result<()> {
1018    // Send link signal to other process
1019    let process = caller.data().environment().get_process(process_id);
1020
1021    if let Some(process) = process {
1022        let id = caller.data().id();
1023        let signal_mailbox = caller.data().signal_mailbox().clone();
1024        let this_process = WasmProcess::new(id, signal_mailbox.0);
1025        process.send(Signal::Monitor(Arc::new(this_process)));
1026    }
1027
1028    Ok(())
1029}
1030
1031// Stop monitoring **process_id**. This is not an atomic operation.
1032//
1033// Traps:
1034// * If the process ID doesn't exist.
1035fn stop_monitoring<T: ProcessState + ProcessCtx<T>>(
1036    caller: Caller<T>,
1037    process_id: u64,
1038) -> Result<()> {
1039    // Create handle to itself
1040    let this_process_id = caller.data().id();
1041
1042    // Send unlink signal to other process
1043    let process = caller.data().environment().get_process(process_id);
1044
1045    if let Some(process) = process {
1046        process.send(Signal::StopMonitoring {
1047            process_id: this_process_id,
1048        });
1049    }
1050
1051    Ok(())
1052}
1053
1054// Send a Kill signal to **process_id**.
1055//
1056// Traps:
1057// * If the process ID doesn't exist.
1058fn kill<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>, process_id: u64) -> Result<()> {
1059    // Send kill signal to process
1060    if let Some(process) = caller.data().environment().get_process(process_id) {
1061        process.send(Signal::Kill);
1062    }
1063    Ok(())
1064}
1065
1066// Checks to see if a process exists
1067fn exists<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>, process_id: u64) -> i32 {
1068    caller
1069        .data()
1070        .environment()
1071        .get_process(process_id)
1072        .is_some() as i32
1073}