Skip to main content

obeli_sk_wasm_workers/
engines.rs

1use std::{error::Error, fmt::Debug, path::Path, sync::Arc};
2use tracing::{debug, warn};
3use wasmtime::{Cache, CacheConfig, Engine, EngineWeak, WasmBacktraceDetails};
4
5#[derive(thiserror::Error, Debug)]
6pub enum EngineError {
7    #[error("uncagegorized engine creation error - {0}")]
8    Uncategorized(Box<dyn Error + Send + Sync>),
9    #[error("error configuring the codegen cache")]
10    CodegenCache(Box<dyn Error + Send + Sync>),
11}
12
13// Copied from wasmtime/crates/cli-flags
14#[derive(PartialEq, Clone, Default, Debug)]
15pub struct PoolingOptions {
16    /// How many bytes to keep resident between instantiations for the
17    /// pooling allocator in linear memories.
18    pub pooling_memory_keep_resident: Option<usize>,
19
20    /// How many bytes to keep resident between instantiations for the
21    /// pooling allocator in tables.
22    pub pooling_table_keep_resident: Option<usize>,
23
24    /// Enable memory protection keys for the pooling allocator; this can
25    /// optimize the size of memory slots.
26    pub memory_protection_keys: Option<bool>,
27
28    /// The maximum number of WebAssembly instances which can be created
29    /// with the pooling allocator.
30    pub pooling_total_core_instances: Option<u32>,
31
32    /// The maximum number of WebAssembly components which can be created
33    /// with the pooling allocator.
34    pub pooling_total_component_instances: Option<u32>,
35
36    /// The maximum number of WebAssembly memories which can be created with
37    /// the pooling allocator.
38    pub pooling_total_memories: Option<u32>,
39
40    /// The maximum number of WebAssembly tables which can be created with
41    /// the pooling allocator.
42    pub pooling_total_tables: Option<u32>,
43
44    /// The maximum number of WebAssembly stacks which can be created with
45    /// the pooling allocator.
46    pub pooling_total_stacks: Option<u32>,
47
48    /// The maximum runtime size of each linear memory in the pooling
49    /// allocator, in bytes.
50    pub pooling_max_memory_size: Option<usize>,
51}
52
53#[derive(Clone, Debug)]
54pub enum PoolingConfig {
55    OnDemand,
56    Pooling(PoolingOptions),
57    PoolingWithFallback(PoolingOptions),
58}
59impl PoolingConfig {
60    fn strategy(&self) -> wasmtime::InstanceAllocationStrategy {
61        match self {
62            PoolingConfig::OnDemand => wasmtime::InstanceAllocationStrategy::OnDemand,
63            PoolingConfig::Pooling(opts) | PoolingConfig::PoolingWithFallback(opts) => {
64                let mut cfg = wasmtime::PoolingAllocationConfig::default();
65                if let Some(size) = opts.pooling_memory_keep_resident {
66                    cfg.linear_memory_keep_resident(size);
67                }
68                if let Some(size) = opts.pooling_table_keep_resident {
69                    cfg.table_keep_resident(size);
70                }
71                if let Some(limit) = opts.pooling_total_core_instances {
72                    cfg.total_core_instances(limit);
73                }
74                if let Some(limit) = opts.pooling_total_component_instances {
75                    cfg.total_component_instances(limit);
76                }
77                if let Some(limit) = opts.pooling_total_memories {
78                    cfg.total_memories(limit);
79                }
80                if let Some(limit) = opts.pooling_total_tables {
81                    cfg.total_tables(limit);
82                }
83                if let Some(limit) = opts.pooling_total_stacks {
84                    cfg.total_stacks(limit);
85                }
86                if let Some(limit) = opts.pooling_max_memory_size {
87                    cfg.max_memory_size(limit);
88                }
89                if let Some(enable) = opts.memory_protection_keys
90                    && enable
91                {
92                    cfg.memory_protection_keys(wasmtime::Enabled::Auto);
93                }
94                wasmtime::InstanceAllocationStrategy::Pooling(cfg)
95            }
96        }
97    }
98}
99#[derive(Clone, Debug)]
100pub struct EngineConfig {
101    pub pooling_config: PoolingConfig,
102    pub codegen_cache_dir: Option<Arc<Path>>,
103    pub consume_fuel: bool,
104    pub parallel_compilation: bool,
105    pub debug: bool,
106}
107
108impl EngineConfig {
109    #[cfg(any(test, feature = "test"))]
110    #[must_use]
111    pub fn on_demand_testing() -> Self {
112        let workspace_dir = std::path::PathBuf::from(
113            std::env::var("CARGO_WORKSPACE_DIR").expect("CARGO_WORKSPACE_DIR must be set"),
114        );
115        let codegen_cache = workspace_dir.join("test-codegen-cache");
116        Self {
117            pooling_config: PoolingConfig::OnDemand,
118            codegen_cache_dir: Some(Arc::from(codegen_cache)),
119            consume_fuel: false,
120            parallel_compilation: true,
121            debug: false,
122        }
123    }
124
125    #[cfg(test)]
126    pub(crate) fn pooling_nocache_testing(opts: PoolingOptions) -> Self {
127        Self {
128            pooling_config: PoolingConfig::Pooling(opts),
129            codegen_cache_dir: None,
130            consume_fuel: false,
131            parallel_compilation: true,
132            debug: false,
133        }
134    }
135}
136
137#[derive(Clone)]
138pub struct Engines {
139    pub activity_engine: Arc<Engine>,
140    pub webhook_engine: Arc<Engine>,
141    pub workflow_engine: Arc<Engine>,
142}
143
144impl Engines {
145    #[must_use]
146    pub fn weak_refs(&self) -> Vec<EngineWeak> {
147        vec![
148            self.activity_engine.weak(),
149            self.workflow_engine.weak(),
150            self.workflow_engine.weak(),
151        ]
152    }
153
154    fn configure_common(
155        mut dst_wasmtime_config: wasmtime::Config,
156        config: EngineConfig,
157    ) -> Result<Arc<Engine>, EngineError> {
158        dst_wasmtime_config.wasm_component_model(true);
159
160        dst_wasmtime_config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
161        dst_wasmtime_config.epoch_interruption(true);
162        dst_wasmtime_config.consume_fuel(config.consume_fuel);
163
164        if config.debug {
165            dst_wasmtime_config.debug_info(true);
166            dst_wasmtime_config.cranelift_opt_level(wasmtime::OptLevel::None);
167        }
168
169        dst_wasmtime_config.parallel_compilation(config.parallel_compilation);
170
171        dst_wasmtime_config.allocation_strategy(config.pooling_config.strategy());
172        if let Some(codegen_cache_dir) = config.codegen_cache_dir {
173            let mut cache_config = CacheConfig::new();
174            cache_config.with_directory(codegen_cache_dir.as_ref());
175            let cache =
176                Cache::new(cache_config).map_err(|err| EngineError::CodegenCache(err.into()))?;
177            dst_wasmtime_config.cache(Some(cache));
178        }
179        Engine::new(&dst_wasmtime_config)
180            .map(Arc::new)
181            .map_err(|err| EngineError::Uncategorized(err.into()))
182    }
183
184    pub(crate) fn get_webhook_engine(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
185        Self::configure_common(wasmtime::Config::new(), config)
186    }
187
188    #[cfg(any(test, feature = "test"))]
189    pub fn get_activity_engine_test(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
190        Self::get_activity_engine_internal(config)
191    }
192
193    fn get_activity_engine_internal(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
194        Self::configure_common(wasmtime::Config::new(), config)
195    }
196
197    #[cfg(any(test, feature = "test"))]
198    pub fn get_workflow_engine_test(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
199        Self::get_workflow_engine_internal(config)
200    }
201    fn get_workflow_engine_internal(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
202        let mut wasmtime_config = wasmtime::Config::new();
203        // Make sure the runtime is deterministic when using `simd` or `relaxed_simd`.
204        // https://bytecodealliance.zulipchat.com/#narrow/channel/206238-general/topic/Determinism.20of.20Wasm.20SIMD.20in.20Wasmtime
205        wasmtime_config.cranelift_nan_canonicalization(true);
206        wasmtime_config.relaxed_simd_deterministic(true);
207        Self::configure_common(wasmtime_config, config)
208    }
209
210    pub fn new(engine_config: EngineConfig) -> Result<Self, EngineError> {
211        let res: Result<_, EngineError> = (|engine_config: &EngineConfig| {
212            Ok(Engines {
213                activity_engine: Self::get_activity_engine_internal(engine_config.clone())?,
214                webhook_engine: Self::get_webhook_engine(engine_config.clone())?,
215                workflow_engine: Self::get_workflow_engine_internal(engine_config.clone())?,
216            })
217        })(&engine_config);
218
219        if let Err(err) = &res
220            && matches!(
221                engine_config.pooling_config,
222                PoolingConfig::PoolingWithFallback(_)
223            )
224        {
225            warn!("Falling back to on-demand allocator - {err}");
226            debug!("{err:?}");
227            let engine_config = EngineConfig {
228                pooling_config: PoolingConfig::OnDemand,
229                codegen_cache_dir: engine_config.codegen_cache_dir,
230                consume_fuel: engine_config.consume_fuel,
231                parallel_compilation: engine_config.parallel_compilation,
232                debug: engine_config.debug,
233            };
234            Self::new(engine_config)
235        } else {
236            res
237        }
238    }
239}