1use std::{
2 convert::{TryFrom, TryInto},
3 future::Future,
4 io::Write,
5 path::Path,
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use anyhow::{anyhow, Result};
11use hash_map_id::HashMapId;
12use lunatic_common_api::{get_memory, IntoTrap};
13use lunatic_distributed::DistributedCtx;
14use lunatic_error_api::ErrorCtx;
15use lunatic_process::{
16 config::ProcessConfig,
17 env::Environment,
18 mailbox::MessageMailbox,
19 message::Message,
20 runtimes::{wasmtime::WasmtimeCompiledModule, RawWasm},
21 state::ProcessState,
22 DeathReason, Process, Signal, WasmProcess,
23};
24use lunatic_wasi_api::LunaticWasiCtx;
25use wasmtime::{Caller, Linker, ResourceLimiter, Val};
26
27pub type ProcessResources = HashMapId<Arc<dyn Process>>;
28pub type ModuleResources<S> = HashMapId<Arc<WasmtimeCompiledModule<S>>>;
29
30pub trait ProcessConfigCtx {
31 fn can_compile_modules(&self) -> bool;
32 fn set_can_compile_modules(&mut self, can: bool);
33 fn can_create_configs(&self) -> bool;
34 fn set_can_create_configs(&mut self, can: bool);
35 fn can_spawn_processes(&self) -> bool;
36 fn set_can_spawn_processes(&mut self, can: bool);
37 fn can_access_fs_location(&self, path: &Path) -> Result<(), String>;
38}
39
40pub trait ProcessCtx<S: ProcessState> {
41 fn mailbox(&mut self) -> &mut MessageMailbox;
42 fn message_scratch_area(&mut self) -> &mut Option<Message>;
43 fn module_resources(&self) -> &ModuleResources<S>;
44 fn module_resources_mut(&mut self) -> &mut ModuleResources<S>;
45 fn environment(&self) -> Arc<dyn Environment>;
46}
47
48pub fn register<T, E>(linker: &mut Linker<T>) -> Result<()>
50where
51 T: ProcessState
52 + ProcessCtx<T>
53 + DistributedCtx<E>
54 + ErrorCtx
55 + LunaticWasiCtx
56 + Send
57 + Sync
58 + ResourceLimiter
59 + 'static,
60 for<'a> &'a T: Send,
61 T::Config: ProcessConfigCtx,
62 E: Environment + 'static,
63{
64 #[cfg(feature = "metrics")]
65 lunatic_process::describe_metrics();
66
67 #[cfg(feature = "metrics")]
68 metrics::describe_counter!(
69 "lunatic.process.modules.compiled",
70 metrics::Unit::Count,
71 "number of modules compiled since startup"
72 );
73
74 #[cfg(feature = "metrics")]
75 metrics::describe_counter!(
76 "lunatic.process.modules.dropped",
77 metrics::Unit::Count,
78 "number of modules dropped since startup"
79 );
80
81 #[cfg(feature = "metrics")]
82 metrics::describe_gauge!(
83 "lunatic.process.modules.active",
84 metrics::Unit::Count,
85 "number of modules currently in memory"
86 );
87
88 #[cfg(feature = "metrics")]
89 metrics::describe_histogram!(
90 "lunatic.process.modules.compiled.duration",
91 metrics::Unit::Seconds,
92 "Duration of module compilation"
93 );
94
95 linker.func_wrap("lunatic::process", "compile_module", compile_module)?;
96 linker.func_wrap("lunatic::process", "drop_module", drop_module)?;
97
98 #[cfg(feature = "metrics")]
99 metrics::describe_counter!(
100 "lunatic.process.configs.created",
101 metrics::Unit::Count,
102 "number of configs created since startup"
103 );
104
105 #[cfg(feature = "metrics")]
106 metrics::describe_counter!(
107 "lunatic.process.configs.dropped",
108 metrics::Unit::Count,
109 "number of configs dropped since startup"
110 );
111
112 #[cfg(feature = "metrics")]
113 metrics::describe_gauge!(
114 "lunatic.process.configs.active",
115 metrics::Unit::Count,
116 "number of configs currently in memory"
117 );
118
119 linker.func_wrap("lunatic::process", "create_config", create_config)?;
120 linker.func_wrap("lunatic::process", "drop_config", drop_config)?;
121 linker.func_wrap(
122 "lunatic::process",
123 "config_set_max_memory",
124 config_set_max_memory,
125 )?;
126 linker.func_wrap(
127 "lunatic::process",
128 "config_get_max_memory",
129 config_get_max_memory,
130 )?;
131 linker.func_wrap(
132 "lunatic::process",
133 "config_set_max_fuel",
134 config_set_max_fuel,
135 )?;
136 linker.func_wrap(
137 "lunatic::process",
138 "config_get_max_fuel",
139 config_get_max_fuel,
140 )?;
141 linker.func_wrap(
142 "lunatic::process",
143 "config_can_compile_modules",
144 config_can_compile_modules,
145 )?;
146 linker.func_wrap(
147 "lunatic::process",
148 "config_set_can_compile_modules",
149 config_set_can_compile_modules,
150 )?;
151 linker.func_wrap(
152 "lunatic::process",
153 "config_can_create_configs",
154 config_can_create_configs,
155 )?;
156 linker.func_wrap(
157 "lunatic::process",
158 "config_set_can_create_configs",
159 config_set_can_create_configs,
160 )?;
161 linker.func_wrap(
162 "lunatic::process",
163 "config_can_spawn_processes",
164 config_can_spawn_processes,
165 )?;
166 linker.func_wrap(
167 "lunatic::process",
168 "config_set_can_spawn_processes",
169 config_set_can_spawn_processes,
170 )?;
171
172 linker.func_wrap8_async("lunatic::process", "spawn", spawn)?;
173 linker.func_wrap11_async("lunatic::process", "get_or_spawn", get_or_spawn)?;
174 linker.func_wrap1_async("lunatic::process", "sleep_ms", sleep_ms)?;
175 linker.func_wrap("lunatic::process", "die_when_link_dies", die_when_link_dies)?;
176
177 linker.func_wrap("lunatic::process", "process_id", process_id)?;
178 linker.func_wrap("lunatic::process", "environment_id", environment_id)?;
179 linker.func_wrap("lunatic::process", "link", link)?;
180 linker.func_wrap("lunatic::process", "unlink", unlink)?;
181 linker.func_wrap("lunatic::process", "monitor", monitor)?;
182 linker.func_wrap("lunatic::process", "stop_monitoring", stop_monitoring)?;
183 linker.func_wrap("lunatic::process", "kill", kill)?;
184 linker.func_wrap("lunatic::process", "exists", exists)?;
185 Ok(())
186}
187
188fn compile_module<T>(
198 mut caller: Caller<T>,
199 module_data_ptr: u32,
200 module_data_len: u32,
201 id_ptr: u32,
202) -> Result<i32>
203where
204 T: ProcessState + ProcessCtx<T> + ErrorCtx,
205 T::Config: ProcessConfigCtx,
206{
207 if !caller.data().config().can_compile_modules() {
209 return Ok(-1);
210 }
211
212 #[cfg(feature = "metrics")]
213 metrics::increment_counter!("lunatic.process.modules.compiled");
214
215 #[cfg(feature = "metrics")]
216 metrics::increment_gauge!("lunatic.process.modules.active", 1.0);
217
218 #[cfg(feature = "metrics")]
219 let start = Instant::now();
220
221 let mut module = vec![0; module_data_len as usize];
222 let memory = get_memory(&mut caller)?;
223 memory
224 .read(&caller, module_data_ptr as usize, module.as_mut_slice())
225 .or_trap("lunatic::process::compile_module")?;
226
227 let module = RawWasm::new(None, module);
228 let (mod_or_error_id, result) = match caller.data().runtime().compile_module(module) {
229 Ok(module) => (
230 caller
231 .data_mut()
232 .module_resources_mut()
233 .add(Arc::new(module)),
234 0,
235 ),
236 Err(error) => (caller.data_mut().error_resources_mut().add(error), 1),
237 };
238
239 #[cfg(feature = "metrics")]
240 let duration = Instant::now() - start;
241 #[cfg(feature = "metrics")]
242 metrics::histogram!("lunatic.process.modules.compiled.duration", duration);
243
244 memory
245 .write(&mut caller, id_ptr as usize, &mod_or_error_id.to_le_bytes())
246 .or_trap("lunatic::process::compile_module")?;
247 Ok(result)
248}
249
250fn drop_module<T: ProcessState + ProcessCtx<T>>(
255 mut caller: Caller<T>,
256 module_id: u64,
257) -> Result<()> {
258 #[cfg(feature = "metrics")]
259 metrics::increment_counter!("lunatic.process.modules.dropped");
260
261 #[cfg(feature = "metrics")]
262 metrics::decrement_gauge!("lunatic.process.modules.active", 1.0);
263
264 caller
265 .data_mut()
266 .module_resources_mut()
267 .remove(module_id)
268 .or_trap("lunatic::process::drop_module: Module ID doesn't exist")?;
269 Ok(())
270}
271
272fn create_config<T>(mut caller: Caller<T>) -> i64
280where
281 T: ProcessState + ProcessCtx<T>,
282 T::Config: ProcessConfigCtx,
283{
284 if !caller.data().config().can_create_configs() {
285 return -1;
286 }
287 let config = T::Config::default();
288 #[cfg(feature = "metrics")]
289 metrics::increment_counter!("lunatic.process.configs.created");
290 #[cfg(feature = "metrics")]
291 metrics::increment_gauge!("lunatic.process.configs.active", 1.0);
292 caller.data_mut().config_resources_mut().add(config) as i64
293}
294
295fn drop_config<T: ProcessState + ProcessCtx<T>>(
300 mut caller: Caller<T>,
301 config_id: u64,
302) -> Result<()> {
303 caller
304 .data_mut()
305 .config_resources_mut()
306 .remove(config_id)
307 .or_trap("lunatic::process::drop_config: Config ID doesn't exist")?;
308 #[cfg(feature = "metrics")]
309 metrics::increment_counter!("lunatic.process.configs.dropped");
310 #[cfg(feature = "metrics")]
311 metrics::decrement_gauge!("lunatic.process.configs.active", 1.0);
312 Ok(())
313}
314
315fn config_set_max_memory<T: ProcessState + ProcessCtx<T>>(
321 mut caller: Caller<T>,
322 config_id: u64,
323 max_memory: u64,
324) -> Result<()> {
325 let max_memory = usize::try_from(max_memory)
326 .or_trap("lunatic::process::config_set_max_memory: max_memory exceeds platform max")?;
327 caller
328 .data_mut()
329 .config_resources_mut()
330 .get_mut(config_id)
331 .or_trap("lunatic::process::config_set_max_memory: Config ID doesn't exist")?
332 .set_max_memory(max_memory);
333 Ok(())
334}
335
336fn config_get_max_memory<T: ProcessState + ProcessCtx<T>>(
341 caller: Caller<T>,
342 config_id: u64,
343) -> Result<u64> {
344 let max_memory = caller
345 .data()
346 .config_resources()
347 .get(config_id)
348 .or_trap("lunatic::process::config_get_max_memory: Config ID doesn't exist")?
349 .get_max_memory();
350 Ok(max_memory as u64)
351}
352
353fn config_set_max_fuel<T: ProcessState + ProcessCtx<T>>(
360 mut caller: Caller<T>,
361 config_id: u64,
362 max_fuel: u64,
363) -> Result<()> {
364 let max_fuel = match max_fuel {
365 0 => None,
366 max_fuel => Some(max_fuel),
367 };
368
369 caller
370 .data_mut()
371 .config_resources_mut()
372 .get_mut(config_id)
373 .or_trap("lunatic::process::config_set_max_fuel: Config ID doesn't exist")?
374 .set_max_fuel(max_fuel);
375 Ok(())
376}
377
378fn config_get_max_fuel<T: ProcessState + ProcessCtx<T>>(
385 caller: Caller<T>,
386 config_id: u64,
387) -> Result<u64> {
388 let max_fuel = caller
389 .data()
390 .config_resources()
391 .get(config_id)
392 .or_trap("lunatic::process::config_get_max_fuel: Config ID doesn't exist")?
393 .get_max_fuel();
394 match max_fuel {
395 None => Ok(0),
396 Some(max_fuel) => Ok(max_fuel),
397 }
398}
399
400fn config_can_compile_modules<T>(caller: Caller<T>, config_id: u64) -> Result<u32>
405where
406 T: ProcessState + ProcessCtx<T>,
407 T::Config: ProcessConfigCtx,
408{
409 let can = caller
410 .data()
411 .config_resources()
412 .get(config_id)
413 .or_trap("lunatic::process::config_can_compile_modules: Config ID doesn't exist")?
414 .can_compile_modules();
415 Ok(can as u32)
416}
417
418fn config_set_can_compile_modules<T>(mut caller: Caller<T>, config_id: u64, can: u32) -> Result<()>
424where
425 T: ProcessState + ProcessCtx<T>,
426 T::Config: ProcessConfigCtx,
427{
428 caller
429 .data_mut()
430 .config_resources_mut()
431 .get_mut(config_id)
432 .or_trap("lunatic::process::config_set_can_compile_modules: Config ID doesn't exist")?
433 .set_can_compile_modules(can != 0);
434 Ok(())
435}
436
437fn config_can_create_configs<T>(caller: Caller<T>, config_id: u64) -> Result<u32>
443where
444 T: ProcessState + ProcessCtx<T>,
445 T::Config: ProcessConfigCtx,
446{
447 let can = caller
448 .data()
449 .config_resources()
450 .get(config_id)
451 .or_trap("lunatic::process::config_can_create_configs: Config ID doesn't exist")?
452 .can_create_configs();
453 Ok(can as u32)
454}
455
456fn config_set_can_create_configs<T>(mut caller: Caller<T>, config_id: u64, can: u32) -> Result<()>
462where
463 T: ProcessState + ProcessCtx<T>,
464 T::Config: ProcessConfigCtx,
465{
466 caller
467 .data_mut()
468 .config_resources_mut()
469 .get_mut(config_id)
470 .or_trap("lunatic::process::config_set_can_create_configs: Config ID doesn't exist")?
471 .set_can_create_configs(can != 0);
472 Ok(())
473}
474
475fn config_can_spawn_processes<T>(caller: Caller<T>, config_id: u64) -> Result<u32>
480where
481 T: ProcessState + ProcessCtx<T>,
482 T::Config: ProcessConfigCtx,
483{
484 let can = caller
485 .data()
486 .config_resources()
487 .get(config_id)
488 .or_trap("lunatic::process::config_can_spawn_processes: Config ID doesn't exist")?
489 .can_spawn_processes();
490 Ok(can as u32)
491}
492
493fn config_set_can_spawn_processes<T>(mut caller: Caller<T>, config_id: u64, can: u32) -> Result<()>
499where
500 T: ProcessState + ProcessCtx<T>,
501 T::Config: ProcessConfigCtx,
502{
503 caller
504 .data_mut()
505 .config_resources_mut()
506 .get_mut(config_id)
507 .or_trap("lunatic::process::config_set_can_spawn_processes: Config ID doesn't exist")?
508 .set_can_spawn_processes(can != 0);
509 Ok(())
510}
511
512#[allow(clippy::too_many_arguments)]
539fn spawn<T>(
540 mut caller: Caller<T>,
541 link: i64,
542 config_id: i64,
543 module_id: i64,
544 func_str_ptr: u32,
545 func_str_len: u32,
546 params_ptr: u32,
547 params_len: u32,
548 id_ptr: u32,
549) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
550where
551 T: ProcessState
552 + ProcessCtx<T>
553 + ErrorCtx
554 + LunaticWasiCtx
555 + ResourceLimiter
556 + Send
557 + Sync
558 + 'static,
559 for<'a> &'a T: Send,
560 T::Config: ProcessConfigCtx,
561{
562 Box::new(async move {
563 if !caller.data().config().can_spawn_processes() {
564 return Err(anyhow!(
565 "Process doesn't have permissions to spawn sub-processes"
566 ));
567 }
568
569 let env = caller.data().environment();
570 env.can_spawn_next_process()
571 .await
572 .or_trap("lunatic::process:spawn: Process spawn limit reached.")?;
573
574 let state = caller.data();
575
576 if !state.is_initialized() {
577 return Err(anyhow!("Cannot spawn process during module initialization"));
578 }
579
580 let config = match config_id {
581 -1 => state.config().clone(),
582 config_id => Arc::new(
583 caller
584 .data()
585 .config_resources()
586 .get(config_id as u64)
587 .or_trap("lunatic::process::spawn: Config ID doesn't exist")?
588 .clone(),
589 ),
590 };
591
592 let module = match module_id {
593 -1 => state.module().clone(),
594 module_id => caller
595 .data()
596 .module_resources()
597 .get(module_id as u64)
598 .or_trap("lunatic::process::spawn: Module ID doesn't exist")?
599 .clone(),
600 };
601
602 let mut new_state = state.new_state(module.clone(), config)?;
603
604 let memory = get_memory(&mut caller)?;
605 let func_str = memory
606 .data(&caller)
607 .get(func_str_ptr as usize..(func_str_ptr + func_str_len) as usize)
608 .or_trap("lunatic::process::spawn")?;
609 let function = std::str::from_utf8(func_str).or_trap("lunatic::process::spawn")?;
610 let params = memory
611 .data(&caller)
612 .get(params_ptr as usize..(params_ptr + params_len) as usize)
613 .or_trap("lunatic::process::spawn")?;
614 let params_chunks = &mut params.chunks_exact(17);
615 let params = params_chunks
616 .map(|chunk| {
617 let value = u128::from_le_bytes(chunk[1..].try_into()?);
618 let result = match chunk[0] {
619 0x7F => Val::I32(value as i32),
620 0x7E => Val::I64(value as i64),
621 0x7B => Val::V128(value),
622 _ => return Err(anyhow!("Unsupported type ID")),
623 };
624 Ok(result)
625 })
626 .collect::<Result<Vec<_>>>()?;
627 if !params_chunks.remainder().is_empty() {
628 return Err(anyhow!(
629 "Params array must be in chunks of 17 bytes, but {} bytes remained",
630 params_chunks.remainder().len()
631 ));
632 }
633 let link: Option<(Option<i64>, Arc<dyn Process>)> = match link {
635 0 => None,
636 tag => {
637 let id = caller.data().id();
638 let signal_mailbox = caller.data().signal_mailbox().clone();
639 let process = WasmProcess::new(id, signal_mailbox.0);
640 Some((Some(tag), Arc::new(process)))
641 }
642 };
643
644 let runtime = caller.data().runtime().clone();
645
646 let stdout = if let Some(stdout) = caller.data().get_stdout() {
648 let next_stream = stdout.next();
649 new_state.set_stdout(next_stream.clone());
650 Some((stdout.clone(), next_stream))
651 } else {
652 None
653 };
654 if let Some(stderr) = caller.data().get_stderr() {
655 if let Some((stdout, next_stream)) = stdout {
657 if &stdout == stderr {
658 new_state.set_stderr(next_stream);
659 } else {
660 new_state.set_stderr(stderr.next());
661 }
662 } else {
663 new_state.set_stderr(stderr.next());
664 }
665 }
666
667 let env = caller.data().environment();
669 let (proc_or_error_id, result) = match lunatic_process::wasm::spawn_wasm(
670 env, runtime, &module, new_state, function, params, link,
671 )
672 .await
673 {
674 Ok((_, process)) => (process.id(), 0),
675 Err(error) => (caller.data_mut().error_resources_mut().add(error), 1),
676 };
677
678 memory
679 .write(caller, id_ptr as usize, &proc_or_error_id.to_le_bytes())
680 .or_trap("lunatic::process::spawn")?;
681 Ok(result)
682 })
683}
684
685#[allow(clippy::too_many_arguments)]
707fn get_or_spawn<T, E>(
708 mut caller: Caller<T>,
709 name_str_ptr: u32,
710 name_str_len: u32,
711 link: i64,
712 config_id: i64,
713 module_id: i64,
714 func_str_ptr: u32,
715 func_str_len: u32,
716 params_ptr: u32,
717 params_len: u32,
718 node_id_ptr: u32,
719 id_ptr: u32,
720) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
721where
722 T: ProcessState
723 + ProcessCtx<T>
724 + DistributedCtx<E>
725 + ErrorCtx
726 + LunaticWasiCtx
727 + ResourceLimiter
728 + Send
729 + Sync
730 + 'static,
731 for<'a> &'a T: Send,
732 T::Config: ProcessConfigCtx,
733 E: Environment,
734{
735 Box::new(async move {
736 let memory = get_memory(&mut caller)?;
737 let (memory_slice, state) = memory.data_and_store_mut(&mut caller);
738 let name = memory_slice
739 .get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize)
740 .or_trap("lunatic::process::get_or_spawn")?;
741 let name = std::str::from_utf8(name).or_trap("lunatic::process::get_or_spawn")?;
742
743 let registry = state.registry().clone();
745 let mut registry = registry.write().await;
746 let process = registry.get(name).copied();
747
748 if let Some((node_id, process_id)) = process {
749 memory_slice
751 .get_mut(node_id_ptr as usize..(node_id_ptr + 8) as usize)
752 .or_trap("lunatic::process::get_or_spawn")?
753 .write(&node_id.to_le_bytes())
754 .or_trap("lunatic::process::get_or_spawn")?;
755
756 memory_slice
757 .get_mut(id_ptr as usize..(id_ptr + 8) as usize)
758 .or_trap("lunatic::process::get_or_spawn")?
759 .write(&process_id.to_le_bytes())
760 .or_trap("lunatic::process::get_or_spawn")?;
761 Ok(2)
762 } else {
763 let name = name.to_owned();
764 if !state.config().can_spawn_processes() {
767 return Err(anyhow!(
768 "lunatic::process:get_or_spawn: Process doesn't have permissions to spawn sub-processes"
769 ));
770 }
771
772 let env = state.environment();
773 env.can_spawn_next_process()
774 .await
775 .or_trap("lunatic::process:get_or_spawn: Process spawn limit reached.")?;
776
777 if !state.is_initialized() {
778 return Err(
779 anyhow!("lunatic::process:get_or_spawn: Cannot spawn process during module initialization")
780 );
781 }
782
783 let config = match config_id {
784 -1 => state.config().clone(),
785 config_id => Arc::new(
786 state
787 .config_resources()
788 .get(config_id as u64)
789 .or_trap("lunatic::process::get_or_spawn: Config ID doesn't exist")?
790 .clone(),
791 ),
792 };
793
794 let module = match module_id {
795 -1 => state.module().clone(),
796 module_id => state
797 .module_resources()
798 .get(module_id as u64)
799 .or_trap("lunatic::process::get_or_spawn: Module ID doesn't exist")?
800 .clone(),
801 };
802
803 let mut new_state = state.new_state(module.clone(), config)?;
804
805 let func_str = memory_slice
806 .get(func_str_ptr as usize..(func_str_ptr + func_str_len) as usize)
807 .or_trap("lunatic::process::get_or_spawn")?;
808 let function =
809 std::str::from_utf8(func_str).or_trap("lunatic::process::get_or_spawn")?;
810 let params = memory_slice
811 .get(params_ptr as usize..(params_ptr + params_len) as usize)
812 .or_trap("lunatic::process::get_or_spawn")?;
813 let params_chunks = &mut params.chunks_exact(17);
814 let params = params_chunks
815 .map(|chunk| {
816 let value = u128::from_le_bytes(chunk[1..].try_into()?);
817 let result = match chunk[0] {
818 0x7F => Val::I32(value as i32),
819 0x7E => Val::I64(value as i64),
820 0x7B => Val::V128(value),
821 _ => return Err(anyhow!("Unsupported type ID")),
822 };
823 Ok(result)
824 })
825 .collect::<Result<Vec<_>>>()?;
826 if !params_chunks.remainder().is_empty() {
827 return Err(anyhow!(
828 "Params array must be in chunks of 17 bytes, but {} bytes remained",
829 params_chunks.remainder().len()
830 ));
831 }
832 let link: Option<(Option<i64>, Arc<dyn Process>)> = match link {
834 0 => None,
835 tag => {
836 let id = state.id();
837 let signal_mailbox = state.signal_mailbox().clone();
838 let process = WasmProcess::new(id, signal_mailbox.0);
839 Some((Some(tag), Arc::new(process)))
840 }
841 };
842
843 let runtime = state.runtime().clone();
844
845 let stdout = if let Some(stdout) = state.get_stdout() {
847 let next_stream = stdout.next();
848 new_state.set_stdout(next_stream.clone());
849 Some((stdout.clone(), next_stream))
850 } else {
851 None
852 };
853 if let Some(stderr) = state.get_stderr() {
854 if let Some((stdout, next_stream)) = stdout {
856 if &stdout == stderr {
857 new_state.set_stderr(next_stream);
858 } else {
859 new_state.set_stderr(stderr.next());
860 }
861 } else {
862 new_state.set_stderr(stderr.next());
863 }
864 }
865
866 let env = state.environment();
868 let (proc_or_error_id, result) = match lunatic_process::wasm::spawn_wasm(
869 env, runtime, &module, new_state, function, params, link,
870 )
871 .await
872 {
873 Ok((_, process)) => (process.id(), 0),
874 Err(error) => (state.error_resources_mut().add(error), 1),
875 };
876
877 let node_id = state
878 .distributed()
879 .as_ref()
880 .map(|d| d.node_id())
881 .unwrap_or(0);
882 memory_slice
883 .get_mut(node_id_ptr as usize..(node_id_ptr + 8) as usize)
884 .or_trap("lunatic::process::get_or_spawn")?
885 .write(&node_id.to_le_bytes())
886 .or_trap("lunatic::process::get_or_spawn")?;
887
888 memory_slice
889 .get_mut(id_ptr as usize..(id_ptr + 8) as usize)
890 .or_trap("lunatic::process::get_or_spawn")?
891 .write(&proc_or_error_id.to_le_bytes())
892 .or_trap("lunatic::process::get_or_spawn")?;
893
894 registry.insert(name, (node_id, proc_or_error_id));
896
897 Ok(result)
898 }
899 })
900}
901
902fn sleep_ms<T: ProcessState + ProcessCtx<T>>(
906 _: Caller<T>,
907 millis: u64,
908) -> Box<dyn Future<Output = ()> + Send + '_> {
909 Box::new(async move {
910 tokio::time::sleep(Duration::from_millis(millis)).await;
911 })
912}
913
914fn die_when_link_dies<T: ProcessState + ProcessCtx<T>>(mut caller: Caller<T>, trap: u32) {
922 caller
923 .data_mut()
924 .signal_mailbox()
925 .0
926 .send(Signal::DieWhenLinkDies(trap != 0))
927 .expect("The signal is sent to itself and the receiver must exist at this point");
928}
929
930fn process_id<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>) -> u64 {
932 caller.data().id()
933}
934
935fn environment_id<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>) -> u64 {
937 caller.data().environment().id()
938}
939
940fn link<T: ProcessState + ProcessCtx<T>>(
946 mut caller: Caller<T>,
947 tag: i64,
948 process_id: u64,
949) -> Result<()> {
950 let tag = match tag {
951 0 => None,
952 tag => Some(tag),
953 };
954 let id = caller.data().id();
956 let signal_mailbox = caller.data().signal_mailbox().clone();
957 let this_process = WasmProcess::new(id, signal_mailbox.0);
958
959 let process = caller.data().environment().get_process(process_id);
961
962 if let Some(process) = process {
963 process.send(Signal::Link(tag, Arc::new(this_process)));
964
965 caller
967 .data_mut()
968 .signal_mailbox()
969 .0
970 .send(Signal::Link(tag, process))
971 .expect("The Link signal is sent to itself and the receiver must exist at this point");
972 } else {
973 caller
974 .data_mut()
975 .signal_mailbox()
976 .0
977 .send(Signal::LinkDied(process_id, tag, DeathReason::NoProcess))
978 .expect(
979 "The LinkDied signal is sent to itself and the receiver must exist at this point",
980 );
981 }
982 Ok(())
983}
984
985fn unlink<T: ProcessState + ProcessCtx<T>>(mut caller: Caller<T>, process_id: u64) -> Result<()> {
990 let this_process_id = caller.data().id();
992
993 let process = caller.data().environment().get_process(process_id);
995
996 if let Some(process) = process {
997 process.send(Signal::UnLink {
998 process_id: this_process_id,
999 });
1000 }
1001
1002 caller
1004 .data_mut()
1005 .signal_mailbox()
1006 .0
1007 .send(Signal::UnLink { process_id })
1008 .expect("The signal is sent to itself and the receiver must exist at this point");
1009
1010 Ok(())
1011}
1012
1013fn monitor<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>, process_id: u64) -> Result<()> {
1018 let process = caller.data().environment().get_process(process_id);
1020
1021 if let Some(process) = process {
1022 let id = caller.data().id();
1023 let signal_mailbox = caller.data().signal_mailbox().clone();
1024 let this_process = WasmProcess::new(id, signal_mailbox.0);
1025 process.send(Signal::Monitor(Arc::new(this_process)));
1026 }
1027
1028 Ok(())
1029}
1030
1031fn stop_monitoring<T: ProcessState + ProcessCtx<T>>(
1036 caller: Caller<T>,
1037 process_id: u64,
1038) -> Result<()> {
1039 let this_process_id = caller.data().id();
1041
1042 let process = caller.data().environment().get_process(process_id);
1044
1045 if let Some(process) = process {
1046 process.send(Signal::StopMonitoring {
1047 process_id: this_process_id,
1048 });
1049 }
1050
1051 Ok(())
1052}
1053
1054fn kill<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>, process_id: u64) -> Result<()> {
1059 if let Some(process) = caller.data().environment().get_process(process_id) {
1061 process.send(Signal::Kill);
1062 }
1063 Ok(())
1064}
1065
1066fn exists<T: ProcessState + ProcessCtx<T>>(caller: Caller<T>, process_id: u64) -> i32 {
1068 caller
1069 .data()
1070 .environment()
1071 .get_process(process_id)
1072 .is_some() as i32
1073}