lunatic_runtime/
state.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::sync::Arc;
4
5use anyhow::Result;
6use hash_map_id::HashMapId;
7use lunatic_distributed::{DistributedCtx, DistributedProcessState};
8use lunatic_error_api::{ErrorCtx, ErrorResource};
9use lunatic_networking_api::{DnsIterator, TlsConnection, TlsListener};
10use lunatic_networking_api::{NetworkingCtx, TcpConnection};
11use lunatic_process::env::{Environment, LunaticEnvironment};
12use lunatic_process::runtimes::wasmtime::{WasmtimeCompiledModule, WasmtimeRuntime};
13use lunatic_process::state::{ConfigResources, ProcessState};
14use lunatic_process::{
15    config::ProcessConfig,
16    state::{SignalReceiver, SignalSender},
17};
18use lunatic_process::{mailbox::MessageMailbox, message::Message};
19use lunatic_process_api::{ProcessConfigCtx, ProcessCtx};
20use lunatic_sqlite_api::{SQLiteConnections, SQLiteCtx, SQLiteGuestAllocators, SQLiteStatements};
21use lunatic_stdout_capture::StdoutCapture;
22use lunatic_timer_api::{TimerCtx, TimerResources};
23use lunatic_wasi_api::{build_wasi, LunaticWasiCtx};
24use tokio::net::{TcpListener, UdpSocket};
25use tokio::sync::mpsc::unbounded_channel;
26use tokio::sync::{Mutex, RwLock};
27use wasmtime::{Linker, ResourceLimiter};
28use wasmtime_wasi::WasiCtx;
29
30use crate::DefaultProcessConfig;
31
32#[derive(Debug, Default)]
33pub struct DbResources {
34    // sqlite data
35    sqlite_connections: SQLiteConnections,
36    sqlite_statements: SQLiteStatements,
37    sqlite_guest_allocator: SQLiteGuestAllocators,
38}
39
40pub struct DefaultProcessState {
41    // Process id
42    pub(crate) id: u64,
43    pub(crate) environment: Arc<LunaticEnvironment>,
44    pub(crate) distributed: Option<DistributedProcessState>,
45    // The WebAssembly runtime
46    runtime: Option<WasmtimeRuntime>,
47    // The module that this process was spawned from
48    module: Option<Arc<WasmtimeCompiledModule<Self>>>,
49    // The process configuration
50    config: Arc<DefaultProcessConfig>,
51    // A space that can be used to temporarily store messages when sending or receiving them.
52    // Messages can contain resources that need to be added across multiple host. Likewise,
53    // receiving messages is done in two steps, first the message size is returned to allow the
54    // guest to reserve enough space, and then it's received. Both of those actions use
55    // `message` as a temp space to store messages across host calls.
56    message: Option<Message>,
57    // Signals sent to the mailbox
58    signal_mailbox: (SignalSender, SignalReceiver),
59    // Messages sent to the process
60    message_mailbox: MessageMailbox,
61    // Resources
62    resources: Resources,
63    // WASI
64    wasi: WasiCtx,
65    // WASI stdout stream
66    wasi_stdout: Option<StdoutCapture>,
67    // WASI stderr stream
68    wasi_stderr: Option<StdoutCapture>,
69    // Set to true if the WASM module has been instantiated
70    initialized: bool,
71    // database resources
72    db_resources: DbResources,
73    registry: Arc<RwLock<HashMap<String, (u64, u64)>>>,
74}
75
76impl DefaultProcessState {
77    pub fn new(
78        environment: Arc<LunaticEnvironment>,
79        distributed: Option<DistributedProcessState>,
80        runtime: WasmtimeRuntime,
81        module: Arc<WasmtimeCompiledModule<Self>>,
82        config: Arc<DefaultProcessConfig>,
83        registry: Arc<RwLock<HashMap<String, (u64, u64)>>>,
84    ) -> Result<Self> {
85        let signal_mailbox = unbounded_channel();
86        let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1)));
87        let message_mailbox = MessageMailbox::default();
88        let state = Self {
89            id: environment.get_next_process_id(),
90            environment,
91            distributed,
92            runtime: Some(runtime),
93            module: Some(module),
94            config: config.clone(),
95            message: None,
96            signal_mailbox,
97            message_mailbox,
98            resources: Resources::default(),
99            wasi: build_wasi(
100                Some(config.command_line_arguments()),
101                Some(config.environment_variables()),
102                config.preopened_dirs(),
103            )?,
104            wasi_stdout: None,
105            wasi_stderr: None,
106            initialized: false,
107            registry,
108            db_resources: DbResources::default(),
109        };
110        Ok(state)
111    }
112}
113
114impl ProcessState for DefaultProcessState {
115    type Config = DefaultProcessConfig;
116
117    fn new_state(
118        &self,
119        module: Arc<WasmtimeCompiledModule<Self>>,
120        config: Arc<DefaultProcessConfig>,
121    ) -> Result<Self> {
122        let signal_mailbox = unbounded_channel();
123        let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1)));
124        let message_mailbox = MessageMailbox::default();
125        let state = Self {
126            id: self.environment.get_next_process_id(),
127            environment: self.environment.clone(),
128            distributed: self.distributed.clone(),
129            runtime: self.runtime.clone(),
130            module: Some(module),
131            config: config.clone(),
132            message: None,
133            signal_mailbox,
134            message_mailbox,
135            resources: Resources::default(),
136            wasi: build_wasi(
137                Some(config.command_line_arguments()),
138                Some(config.environment_variables()),
139                config.preopened_dirs(),
140            )?,
141            wasi_stdout: None,
142            wasi_stderr: None,
143            initialized: false,
144            registry: self.registry.clone(),
145            db_resources: DbResources::default(),
146        };
147        Ok(state)
148    }
149
150    fn register(linker: &mut Linker<Self>) -> Result<()> {
151        lunatic_error_api::register(linker)?;
152        lunatic_process_api::register(linker)?;
153        lunatic_messaging_api::register(linker)?;
154        lunatic_timer_api::register(linker)?;
155        lunatic_networking_api::register(linker)?;
156        lunatic_version_api::register(linker)?;
157        lunatic_wasi_api::register(linker)?;
158        lunatic_registry_api::register(linker)?;
159        lunatic_distributed_api::register(linker)?;
160        lunatic_sqlite_api::register(linker)?;
161        #[cfg(feature = "metrics")]
162        lunatic_metrics_api::register(linker)?;
163        lunatic_trap_api::register(linker)?;
164        Ok(())
165    }
166
167    fn initialize(&mut self) {
168        self.initialized = true;
169    }
170
171    fn is_initialized(&self) -> bool {
172        self.initialized
173    }
174
175    fn runtime(&self) -> &WasmtimeRuntime {
176        self.runtime.as_ref().unwrap()
177    }
178
179    fn config(&self) -> &Arc<DefaultProcessConfig> {
180        &self.config
181    }
182
183    fn module(&self) -> &Arc<WasmtimeCompiledModule<Self>> {
184        self.module.as_ref().unwrap()
185    }
186
187    fn id(&self) -> u64 {
188        self.id
189    }
190
191    fn signal_mailbox(&self) -> &(SignalSender, SignalReceiver) {
192        &self.signal_mailbox
193    }
194
195    fn message_mailbox(&self) -> &MessageMailbox {
196        &self.message_mailbox
197    }
198
199    fn config_resources(&self) -> &ConfigResources<<DefaultProcessState as ProcessState>::Config> {
200        &self.resources.configs
201    }
202
203    fn config_resources_mut(
204        &mut self,
205    ) -> &mut ConfigResources<<DefaultProcessState as ProcessState>::Config> {
206        &mut self.resources.configs
207    }
208
209    fn registry(&self) -> &Arc<RwLock<HashMap<String, (u64, u64)>>> {
210        &self.registry
211    }
212}
213
214impl Debug for DefaultProcessState {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        f.debug_struct("State")
217            .field("process", &self.resources)
218            .finish()
219    }
220}
221
222// Limit the maximum memory of the process depending on the environment it was spawned in.
223impl ResourceLimiter for DefaultProcessState {
224    fn memory_growing(&mut self, _current: usize, desired: usize, _maximum: Option<usize>) -> bool {
225        desired <= self.config().get_max_memory()
226    }
227
228    fn table_growing(&mut self, _current: u32, desired: u32, _maximum: Option<u32>) -> bool {
229        desired < 100_000
230    }
231
232    // Allow one instance per store
233    fn instances(&self) -> usize {
234        1
235    }
236
237    // Allow one table per store
238    fn tables(&self) -> usize {
239        1
240    }
241
242    // Allow one memory per store
243    fn memories(&self) -> usize {
244        1
245    }
246}
247
248impl ErrorCtx for DefaultProcessState {
249    fn error_resources(&self) -> &ErrorResource {
250        &self.resources.errors
251    }
252
253    fn error_resources_mut(&mut self) -> &mut ErrorResource {
254        &mut self.resources.errors
255    }
256}
257
258impl ProcessCtx<DefaultProcessState> for DefaultProcessState {
259    fn mailbox(&mut self) -> &mut MessageMailbox {
260        &mut self.message_mailbox
261    }
262
263    fn message_scratch_area(&mut self) -> &mut Option<Message> {
264        &mut self.message
265    }
266
267    fn module_resources(&self) -> &lunatic_process_api::ModuleResources<DefaultProcessState> {
268        &self.resources.modules
269    }
270
271    fn module_resources_mut(
272        &mut self,
273    ) -> &mut lunatic_process_api::ModuleResources<DefaultProcessState> {
274        &mut self.resources.modules
275    }
276
277    fn environment(&self) -> Arc<dyn Environment> {
278        self.environment.clone()
279    }
280}
281
282impl NetworkingCtx for DefaultProcessState {
283    fn tcp_listener_resources(&self) -> &lunatic_networking_api::TcpListenerResources {
284        &self.resources.tcp_listeners
285    }
286
287    fn tcp_listener_resources_mut(&mut self) -> &mut lunatic_networking_api::TcpListenerResources {
288        &mut self.resources.tcp_listeners
289    }
290
291    fn tcp_stream_resources(&self) -> &lunatic_networking_api::TcpStreamResources {
292        &self.resources.tcp_streams
293    }
294
295    fn tcp_stream_resources_mut(&mut self) -> &mut lunatic_networking_api::TcpStreamResources {
296        &mut self.resources.tcp_streams
297    }
298
299    fn tls_listener_resources(&self) -> &lunatic_networking_api::TlsListenerResources {
300        &self.resources.tls_listeners
301    }
302
303    fn tls_listener_resources_mut(&mut self) -> &mut lunatic_networking_api::TlsListenerResources {
304        &mut self.resources.tls_listeners
305    }
306
307    fn tls_stream_resources(&self) -> &lunatic_networking_api::TlsStreamResources {
308        &self.resources.tls_streams
309    }
310
311    fn tls_stream_resources_mut(&mut self) -> &mut lunatic_networking_api::TlsStreamResources {
312        &mut self.resources.tls_streams
313    }
314
315    fn udp_resources(&self) -> &lunatic_networking_api::UdpResources {
316        &self.resources.udp_sockets
317    }
318
319    fn udp_resources_mut(&mut self) -> &mut lunatic_networking_api::UdpResources {
320        &mut self.resources.udp_sockets
321    }
322
323    fn dns_resources(&self) -> &lunatic_networking_api::DnsResources {
324        &self.resources.dns_iterators
325    }
326
327    fn dns_resources_mut(&mut self) -> &mut lunatic_networking_api::DnsResources {
328        &mut self.resources.dns_iterators
329    }
330}
331
332impl TimerCtx for DefaultProcessState {
333    fn timer_resources(&self) -> &TimerResources {
334        &self.resources.timers
335    }
336
337    fn timer_resources_mut(&mut self) -> &mut TimerResources {
338        &mut self.resources.timers
339    }
340}
341
342impl LunaticWasiCtx for DefaultProcessState {
343    fn wasi(&self) -> &WasiCtx {
344        &self.wasi
345    }
346
347    fn wasi_mut(&mut self) -> &mut WasiCtx {
348        &mut self.wasi
349    }
350
351    // Redirect the stdout stream
352    fn set_stdout(&mut self, stdout: StdoutCapture) {
353        self.wasi_stdout = Some(stdout.clone());
354        self.wasi.set_stdout(Box::new(stdout));
355    }
356
357    // Redirect the stderr stream
358    fn set_stderr(&mut self, stderr: StdoutCapture) {
359        self.wasi_stderr = Some(stderr.clone());
360        self.wasi.set_stderr(Box::new(stderr));
361    }
362
363    fn get_stdout(&self) -> Option<&StdoutCapture> {
364        self.wasi_stdout.as_ref()
365    }
366
367    fn get_stderr(&self) -> Option<&StdoutCapture> {
368        self.wasi_stderr.as_ref()
369    }
370}
371
372impl SQLiteCtx for DefaultProcessState {
373    fn sqlite_connections(&self) -> &SQLiteConnections {
374        &self.db_resources.sqlite_connections
375    }
376
377    fn sqlite_connections_mut(&mut self) -> &mut SQLiteConnections {
378        &mut self.db_resources.sqlite_connections
379    }
380
381    fn sqlite_statements_mut(&mut self) -> &mut SQLiteStatements {
382        &mut self.db_resources.sqlite_statements
383    }
384
385    fn sqlite_statements(&self) -> &SQLiteStatements {
386        &self.db_resources.sqlite_statements
387    }
388
389    fn sqlite_guest_allocator(&self) -> &SQLiteGuestAllocators {
390        &self.db_resources.sqlite_guest_allocator
391    }
392    fn sqlite_guest_allocator_mut(&mut self) -> &mut SQLiteGuestAllocators {
393        &mut self.db_resources.sqlite_guest_allocator
394    }
395}
396
397#[derive(Default, Debug)]
398pub(crate) struct Resources {
399    pub(crate) configs: HashMapId<DefaultProcessConfig>,
400    pub(crate) modules: HashMapId<Arc<WasmtimeCompiledModule<DefaultProcessState>>>,
401    pub(crate) timers: TimerResources,
402    pub(crate) dns_iterators: HashMapId<DnsIterator>,
403    pub(crate) tcp_listeners: HashMapId<TcpListener>,
404    pub(crate) tcp_streams: HashMapId<Arc<TcpConnection>>,
405    pub(crate) tls_listeners: HashMapId<TlsListener>,
406    pub(crate) tls_streams: HashMapId<Arc<TlsConnection>>,
407    pub(crate) udp_sockets: HashMapId<Arc<UdpSocket>>,
408    pub(crate) errors: HashMapId<anyhow::Error>,
409}
410
411impl DistributedCtx<LunaticEnvironment> for DefaultProcessState {
412    fn distributed_mut(&mut self) -> Result<&mut DistributedProcessState> {
413        match self.distributed.as_mut() {
414            Some(d) => Ok(d),
415            None => Err(anyhow::anyhow!("Distributed is not initialized")),
416        }
417    }
418
419    fn distributed(&self) -> Result<&DistributedProcessState> {
420        match self.distributed.as_ref() {
421            Some(d) => Ok(d),
422            None => Err(anyhow::anyhow!("Distributed is not initialized")),
423        }
424    }
425
426    fn module_id(&self) -> u64 {
427        self.module
428            .as_ref()
429            .and_then(|m| m.source().id)
430            .unwrap_or(0)
431    }
432
433    fn environment_id(&self) -> u64 {
434        self.environment.id()
435    }
436
437    fn can_spawn(&self) -> bool {
438        self.config().can_spawn_processes()
439    }
440
441    fn new_dist_state(
442        environment: Arc<LunaticEnvironment>,
443        distributed: DistributedProcessState,
444        runtime: WasmtimeRuntime,
445        module: Arc<WasmtimeCompiledModule<Self>>,
446        config: Arc<Self::Config>,
447    ) -> Result<Self> {
448        let signal_mailbox = unbounded_channel();
449        let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1)));
450        let message_mailbox = MessageMailbox::default();
451        let state = Self {
452            id: environment.get_next_process_id(),
453            environment,
454            distributed: Some(distributed),
455            runtime: Some(runtime),
456            module: Some(module),
457            config: config.clone(),
458            message: None,
459            signal_mailbox,
460            message_mailbox,
461            resources: Resources::default(),
462            wasi: build_wasi(
463                Some(config.command_line_arguments()),
464                Some(config.environment_variables()),
465                config.preopened_dirs(),
466            )?,
467            wasi_stdout: None,
468            wasi_stderr: None,
469            initialized: false,
470            registry: Default::default(), // TODO move registry into env?
471            db_resources: DbResources::default(),
472        };
473        Ok(state)
474    }
475}
476
477mod tests {
478
479    #[tokio::test]
480    async fn import_filter_signature_matches() {
481        use std::collections::HashMap;
482        use tokio::sync::RwLock;
483
484        use crate::state::DefaultProcessState;
485        use crate::DefaultProcessConfig;
486        use lunatic_process::env::Environment;
487        use lunatic_process::runtimes::wasmtime::WasmtimeRuntime;
488        use lunatic_process::wasm::spawn_wasm;
489        use std::sync::Arc;
490
491        // The default configuration includes both, the "lunatic::*" and "wasi_*" namespaces.
492        let config = DefaultProcessConfig::default();
493
494        // Create wasmtime runtime
495        let mut wasmtime_config = wasmtime::Config::new();
496        wasmtime_config.async_support(true).consume_fuel(true);
497        let runtime = WasmtimeRuntime::new(&wasmtime_config).unwrap();
498
499        let raw_module = wat::parse_file("./wat/all_imports.wat").unwrap();
500        let module = Arc::new(runtime.compile_module(raw_module.into()).unwrap());
501        let env = Arc::new(lunatic_process::env::LunaticEnvironment::new(0));
502        let registry = Arc::new(RwLock::new(HashMap::new()));
503        let state = DefaultProcessState::new(
504            env.clone(),
505            None,
506            runtime.clone(),
507            module.clone(),
508            Arc::new(config),
509            registry,
510        )
511        .unwrap();
512
513        env.can_spawn_next_process().await.unwrap();
514
515        spawn_wasm(env, runtime, &module, state, "hello", Vec::new(), None)
516            .await
517            .unwrap();
518    }
519}