1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::sync::Arc;
4
5use anyhow::Result;
6use hash_map_id::HashMapId;
7use lunatic_distributed::{DistributedCtx, DistributedProcessState};
8use lunatic_error_api::{ErrorCtx, ErrorResource};
9use lunatic_networking_api::{DnsIterator, TlsConnection, TlsListener};
10use lunatic_networking_api::{NetworkingCtx, TcpConnection};
11use lunatic_process::env::{Environment, LunaticEnvironment};
12use lunatic_process::runtimes::wasmtime::{WasmtimeCompiledModule, WasmtimeRuntime};
13use lunatic_process::state::{ConfigResources, ProcessState};
14use lunatic_process::{
15 config::ProcessConfig,
16 state::{SignalReceiver, SignalSender},
17};
18use lunatic_process::{mailbox::MessageMailbox, message::Message};
19use lunatic_process_api::{ProcessConfigCtx, ProcessCtx};
20use lunatic_sqlite_api::{SQLiteConnections, SQLiteCtx, SQLiteGuestAllocators, SQLiteStatements};
21use lunatic_stdout_capture::StdoutCapture;
22use lunatic_timer_api::{TimerCtx, TimerResources};
23use lunatic_wasi_api::{build_wasi, LunaticWasiCtx};
24use tokio::net::{TcpListener, UdpSocket};
25use tokio::sync::mpsc::unbounded_channel;
26use tokio::sync::{Mutex, RwLock};
27use wasmtime::{Linker, ResourceLimiter};
28use wasmtime_wasi::WasiCtx;
29
30use crate::DefaultProcessConfig;
31
32#[derive(Debug, Default)]
33pub struct DbResources {
34 sqlite_connections: SQLiteConnections,
36 sqlite_statements: SQLiteStatements,
37 sqlite_guest_allocator: SQLiteGuestAllocators,
38}
39
40pub struct DefaultProcessState {
41 pub(crate) id: u64,
43 pub(crate) environment: Arc<LunaticEnvironment>,
44 pub(crate) distributed: Option<DistributedProcessState>,
45 runtime: Option<WasmtimeRuntime>,
47 module: Option<Arc<WasmtimeCompiledModule<Self>>>,
49 config: Arc<DefaultProcessConfig>,
51 message: Option<Message>,
57 signal_mailbox: (SignalSender, SignalReceiver),
59 message_mailbox: MessageMailbox,
61 resources: Resources,
63 wasi: WasiCtx,
65 wasi_stdout: Option<StdoutCapture>,
67 wasi_stderr: Option<StdoutCapture>,
69 initialized: bool,
71 db_resources: DbResources,
73 registry: Arc<RwLock<HashMap<String, (u64, u64)>>>,
74}
75
76impl DefaultProcessState {
77 pub fn new(
78 environment: Arc<LunaticEnvironment>,
79 distributed: Option<DistributedProcessState>,
80 runtime: WasmtimeRuntime,
81 module: Arc<WasmtimeCompiledModule<Self>>,
82 config: Arc<DefaultProcessConfig>,
83 registry: Arc<RwLock<HashMap<String, (u64, u64)>>>,
84 ) -> Result<Self> {
85 let signal_mailbox = unbounded_channel();
86 let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1)));
87 let message_mailbox = MessageMailbox::default();
88 let state = Self {
89 id: environment.get_next_process_id(),
90 environment,
91 distributed,
92 runtime: Some(runtime),
93 module: Some(module),
94 config: config.clone(),
95 message: None,
96 signal_mailbox,
97 message_mailbox,
98 resources: Resources::default(),
99 wasi: build_wasi(
100 Some(config.command_line_arguments()),
101 Some(config.environment_variables()),
102 config.preopened_dirs(),
103 )?,
104 wasi_stdout: None,
105 wasi_stderr: None,
106 initialized: false,
107 registry,
108 db_resources: DbResources::default(),
109 };
110 Ok(state)
111 }
112}
113
114impl ProcessState for DefaultProcessState {
115 type Config = DefaultProcessConfig;
116
117 fn new_state(
118 &self,
119 module: Arc<WasmtimeCompiledModule<Self>>,
120 config: Arc<DefaultProcessConfig>,
121 ) -> Result<Self> {
122 let signal_mailbox = unbounded_channel();
123 let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1)));
124 let message_mailbox = MessageMailbox::default();
125 let state = Self {
126 id: self.environment.get_next_process_id(),
127 environment: self.environment.clone(),
128 distributed: self.distributed.clone(),
129 runtime: self.runtime.clone(),
130 module: Some(module),
131 config: config.clone(),
132 message: None,
133 signal_mailbox,
134 message_mailbox,
135 resources: Resources::default(),
136 wasi: build_wasi(
137 Some(config.command_line_arguments()),
138 Some(config.environment_variables()),
139 config.preopened_dirs(),
140 )?,
141 wasi_stdout: None,
142 wasi_stderr: None,
143 initialized: false,
144 registry: self.registry.clone(),
145 db_resources: DbResources::default(),
146 };
147 Ok(state)
148 }
149
150 fn register(linker: &mut Linker<Self>) -> Result<()> {
151 lunatic_error_api::register(linker)?;
152 lunatic_process_api::register(linker)?;
153 lunatic_messaging_api::register(linker)?;
154 lunatic_timer_api::register(linker)?;
155 lunatic_networking_api::register(linker)?;
156 lunatic_version_api::register(linker)?;
157 lunatic_wasi_api::register(linker)?;
158 lunatic_registry_api::register(linker)?;
159 lunatic_distributed_api::register(linker)?;
160 lunatic_sqlite_api::register(linker)?;
161 #[cfg(feature = "metrics")]
162 lunatic_metrics_api::register(linker)?;
163 lunatic_trap_api::register(linker)?;
164 Ok(())
165 }
166
167 fn initialize(&mut self) {
168 self.initialized = true;
169 }
170
171 fn is_initialized(&self) -> bool {
172 self.initialized
173 }
174
175 fn runtime(&self) -> &WasmtimeRuntime {
176 self.runtime.as_ref().unwrap()
177 }
178
179 fn config(&self) -> &Arc<DefaultProcessConfig> {
180 &self.config
181 }
182
183 fn module(&self) -> &Arc<WasmtimeCompiledModule<Self>> {
184 self.module.as_ref().unwrap()
185 }
186
187 fn id(&self) -> u64 {
188 self.id
189 }
190
191 fn signal_mailbox(&self) -> &(SignalSender, SignalReceiver) {
192 &self.signal_mailbox
193 }
194
195 fn message_mailbox(&self) -> &MessageMailbox {
196 &self.message_mailbox
197 }
198
199 fn config_resources(&self) -> &ConfigResources<<DefaultProcessState as ProcessState>::Config> {
200 &self.resources.configs
201 }
202
203 fn config_resources_mut(
204 &mut self,
205 ) -> &mut ConfigResources<<DefaultProcessState as ProcessState>::Config> {
206 &mut self.resources.configs
207 }
208
209 fn registry(&self) -> &Arc<RwLock<HashMap<String, (u64, u64)>>> {
210 &self.registry
211 }
212}
213
214impl Debug for DefaultProcessState {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 f.debug_struct("State")
217 .field("process", &self.resources)
218 .finish()
219 }
220}
221
222impl ResourceLimiter for DefaultProcessState {
224 fn memory_growing(&mut self, _current: usize, desired: usize, _maximum: Option<usize>) -> bool {
225 desired <= self.config().get_max_memory()
226 }
227
228 fn table_growing(&mut self, _current: u32, desired: u32, _maximum: Option<u32>) -> bool {
229 desired < 100_000
230 }
231
232 fn instances(&self) -> usize {
234 1
235 }
236
237 fn tables(&self) -> usize {
239 1
240 }
241
242 fn memories(&self) -> usize {
244 1
245 }
246}
247
248impl ErrorCtx for DefaultProcessState {
249 fn error_resources(&self) -> &ErrorResource {
250 &self.resources.errors
251 }
252
253 fn error_resources_mut(&mut self) -> &mut ErrorResource {
254 &mut self.resources.errors
255 }
256}
257
258impl ProcessCtx<DefaultProcessState> for DefaultProcessState {
259 fn mailbox(&mut self) -> &mut MessageMailbox {
260 &mut self.message_mailbox
261 }
262
263 fn message_scratch_area(&mut self) -> &mut Option<Message> {
264 &mut self.message
265 }
266
267 fn module_resources(&self) -> &lunatic_process_api::ModuleResources<DefaultProcessState> {
268 &self.resources.modules
269 }
270
271 fn module_resources_mut(
272 &mut self,
273 ) -> &mut lunatic_process_api::ModuleResources<DefaultProcessState> {
274 &mut self.resources.modules
275 }
276
277 fn environment(&self) -> Arc<dyn Environment> {
278 self.environment.clone()
279 }
280}
281
282impl NetworkingCtx for DefaultProcessState {
283 fn tcp_listener_resources(&self) -> &lunatic_networking_api::TcpListenerResources {
284 &self.resources.tcp_listeners
285 }
286
287 fn tcp_listener_resources_mut(&mut self) -> &mut lunatic_networking_api::TcpListenerResources {
288 &mut self.resources.tcp_listeners
289 }
290
291 fn tcp_stream_resources(&self) -> &lunatic_networking_api::TcpStreamResources {
292 &self.resources.tcp_streams
293 }
294
295 fn tcp_stream_resources_mut(&mut self) -> &mut lunatic_networking_api::TcpStreamResources {
296 &mut self.resources.tcp_streams
297 }
298
299 fn tls_listener_resources(&self) -> &lunatic_networking_api::TlsListenerResources {
300 &self.resources.tls_listeners
301 }
302
303 fn tls_listener_resources_mut(&mut self) -> &mut lunatic_networking_api::TlsListenerResources {
304 &mut self.resources.tls_listeners
305 }
306
307 fn tls_stream_resources(&self) -> &lunatic_networking_api::TlsStreamResources {
308 &self.resources.tls_streams
309 }
310
311 fn tls_stream_resources_mut(&mut self) -> &mut lunatic_networking_api::TlsStreamResources {
312 &mut self.resources.tls_streams
313 }
314
315 fn udp_resources(&self) -> &lunatic_networking_api::UdpResources {
316 &self.resources.udp_sockets
317 }
318
319 fn udp_resources_mut(&mut self) -> &mut lunatic_networking_api::UdpResources {
320 &mut self.resources.udp_sockets
321 }
322
323 fn dns_resources(&self) -> &lunatic_networking_api::DnsResources {
324 &self.resources.dns_iterators
325 }
326
327 fn dns_resources_mut(&mut self) -> &mut lunatic_networking_api::DnsResources {
328 &mut self.resources.dns_iterators
329 }
330}
331
332impl TimerCtx for DefaultProcessState {
333 fn timer_resources(&self) -> &TimerResources {
334 &self.resources.timers
335 }
336
337 fn timer_resources_mut(&mut self) -> &mut TimerResources {
338 &mut self.resources.timers
339 }
340}
341
342impl LunaticWasiCtx for DefaultProcessState {
343 fn wasi(&self) -> &WasiCtx {
344 &self.wasi
345 }
346
347 fn wasi_mut(&mut self) -> &mut WasiCtx {
348 &mut self.wasi
349 }
350
351 fn set_stdout(&mut self, stdout: StdoutCapture) {
353 self.wasi_stdout = Some(stdout.clone());
354 self.wasi.set_stdout(Box::new(stdout));
355 }
356
357 fn set_stderr(&mut self, stderr: StdoutCapture) {
359 self.wasi_stderr = Some(stderr.clone());
360 self.wasi.set_stderr(Box::new(stderr));
361 }
362
363 fn get_stdout(&self) -> Option<&StdoutCapture> {
364 self.wasi_stdout.as_ref()
365 }
366
367 fn get_stderr(&self) -> Option<&StdoutCapture> {
368 self.wasi_stderr.as_ref()
369 }
370}
371
372impl SQLiteCtx for DefaultProcessState {
373 fn sqlite_connections(&self) -> &SQLiteConnections {
374 &self.db_resources.sqlite_connections
375 }
376
377 fn sqlite_connections_mut(&mut self) -> &mut SQLiteConnections {
378 &mut self.db_resources.sqlite_connections
379 }
380
381 fn sqlite_statements_mut(&mut self) -> &mut SQLiteStatements {
382 &mut self.db_resources.sqlite_statements
383 }
384
385 fn sqlite_statements(&self) -> &SQLiteStatements {
386 &self.db_resources.sqlite_statements
387 }
388
389 fn sqlite_guest_allocator(&self) -> &SQLiteGuestAllocators {
390 &self.db_resources.sqlite_guest_allocator
391 }
392 fn sqlite_guest_allocator_mut(&mut self) -> &mut SQLiteGuestAllocators {
393 &mut self.db_resources.sqlite_guest_allocator
394 }
395}
396
397#[derive(Default, Debug)]
398pub(crate) struct Resources {
399 pub(crate) configs: HashMapId<DefaultProcessConfig>,
400 pub(crate) modules: HashMapId<Arc<WasmtimeCompiledModule<DefaultProcessState>>>,
401 pub(crate) timers: TimerResources,
402 pub(crate) dns_iterators: HashMapId<DnsIterator>,
403 pub(crate) tcp_listeners: HashMapId<TcpListener>,
404 pub(crate) tcp_streams: HashMapId<Arc<TcpConnection>>,
405 pub(crate) tls_listeners: HashMapId<TlsListener>,
406 pub(crate) tls_streams: HashMapId<Arc<TlsConnection>>,
407 pub(crate) udp_sockets: HashMapId<Arc<UdpSocket>>,
408 pub(crate) errors: HashMapId<anyhow::Error>,
409}
410
411impl DistributedCtx<LunaticEnvironment> for DefaultProcessState {
412 fn distributed_mut(&mut self) -> Result<&mut DistributedProcessState> {
413 match self.distributed.as_mut() {
414 Some(d) => Ok(d),
415 None => Err(anyhow::anyhow!("Distributed is not initialized")),
416 }
417 }
418
419 fn distributed(&self) -> Result<&DistributedProcessState> {
420 match self.distributed.as_ref() {
421 Some(d) => Ok(d),
422 None => Err(anyhow::anyhow!("Distributed is not initialized")),
423 }
424 }
425
426 fn module_id(&self) -> u64 {
427 self.module
428 .as_ref()
429 .and_then(|m| m.source().id)
430 .unwrap_or(0)
431 }
432
433 fn environment_id(&self) -> u64 {
434 self.environment.id()
435 }
436
437 fn can_spawn(&self) -> bool {
438 self.config().can_spawn_processes()
439 }
440
441 fn new_dist_state(
442 environment: Arc<LunaticEnvironment>,
443 distributed: DistributedProcessState,
444 runtime: WasmtimeRuntime,
445 module: Arc<WasmtimeCompiledModule<Self>>,
446 config: Arc<Self::Config>,
447 ) -> Result<Self> {
448 let signal_mailbox = unbounded_channel();
449 let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1)));
450 let message_mailbox = MessageMailbox::default();
451 let state = Self {
452 id: environment.get_next_process_id(),
453 environment,
454 distributed: Some(distributed),
455 runtime: Some(runtime),
456 module: Some(module),
457 config: config.clone(),
458 message: None,
459 signal_mailbox,
460 message_mailbox,
461 resources: Resources::default(),
462 wasi: build_wasi(
463 Some(config.command_line_arguments()),
464 Some(config.environment_variables()),
465 config.preopened_dirs(),
466 )?,
467 wasi_stdout: None,
468 wasi_stderr: None,
469 initialized: false,
470 registry: Default::default(), db_resources: DbResources::default(),
472 };
473 Ok(state)
474 }
475}
476
477mod tests {
478
479 #[tokio::test]
480 async fn import_filter_signature_matches() {
481 use std::collections::HashMap;
482 use tokio::sync::RwLock;
483
484 use crate::state::DefaultProcessState;
485 use crate::DefaultProcessConfig;
486 use lunatic_process::env::Environment;
487 use lunatic_process::runtimes::wasmtime::WasmtimeRuntime;
488 use lunatic_process::wasm::spawn_wasm;
489 use std::sync::Arc;
490
491 let config = DefaultProcessConfig::default();
493
494 let mut wasmtime_config = wasmtime::Config::new();
496 wasmtime_config.async_support(true).consume_fuel(true);
497 let runtime = WasmtimeRuntime::new(&wasmtime_config).unwrap();
498
499 let raw_module = wat::parse_file("./wat/all_imports.wat").unwrap();
500 let module = Arc::new(runtime.compile_module(raw_module.into()).unwrap());
501 let env = Arc::new(lunatic_process::env::LunaticEnvironment::new(0));
502 let registry = Arc::new(RwLock::new(HashMap::new()));
503 let state = DefaultProcessState::new(
504 env.clone(),
505 None,
506 runtime.clone(),
507 module.clone(),
508 Arc::new(config),
509 registry,
510 )
511 .unwrap();
512
513 env.can_spawn_next_process().await.unwrap();
514
515 spawn_wasm(env, runtime, &module, state, "hello", Vec::new(), None)
516 .await
517 .unwrap();
518 }
519}