obeli_sk_wasm_workers/
engines.rs1use std::{error::Error, fmt::Debug, path::Path, sync::Arc};
2use tracing::{debug, warn};
3use wasmtime::{Cache, CacheConfig, Engine, EngineWeak, WasmBacktraceDetails};
4
5#[derive(thiserror::Error, Debug)]
6pub enum EngineError {
7 #[error("uncagegorized engine creation error - {0}")]
8 Uncategorized(Box<dyn Error + Send + Sync>),
9 #[error("error configuring the codegen cache")]
10 CodegenCache(Box<dyn Error + Send + Sync>),
11}
12
13#[derive(PartialEq, Clone, Default, Debug)]
15pub struct PoolingOptions {
16 pub pooling_memory_keep_resident: Option<usize>,
19
20 pub pooling_table_keep_resident: Option<usize>,
23
24 pub memory_protection_keys: Option<bool>,
27
28 pub pooling_total_core_instances: Option<u32>,
31
32 pub pooling_total_component_instances: Option<u32>,
35
36 pub pooling_total_memories: Option<u32>,
39
40 pub pooling_total_tables: Option<u32>,
43
44 pub pooling_total_stacks: Option<u32>,
47
48 pub pooling_max_memory_size: Option<usize>,
51}
52
53#[derive(Clone, Debug)]
54pub enum PoolingConfig {
55 OnDemand,
56 Pooling(PoolingOptions),
57 PoolingWithFallback(PoolingOptions),
58}
59impl PoolingConfig {
60 fn strategy(&self) -> wasmtime::InstanceAllocationStrategy {
61 match self {
62 PoolingConfig::OnDemand => wasmtime::InstanceAllocationStrategy::OnDemand,
63 PoolingConfig::Pooling(opts) | PoolingConfig::PoolingWithFallback(opts) => {
64 let mut cfg = wasmtime::PoolingAllocationConfig::default();
65 if let Some(size) = opts.pooling_memory_keep_resident {
66 cfg.linear_memory_keep_resident(size);
67 }
68 if let Some(size) = opts.pooling_table_keep_resident {
69 cfg.table_keep_resident(size);
70 }
71 if let Some(limit) = opts.pooling_total_core_instances {
72 cfg.total_core_instances(limit);
73 }
74 if let Some(limit) = opts.pooling_total_component_instances {
75 cfg.total_component_instances(limit);
76 }
77 if let Some(limit) = opts.pooling_total_memories {
78 cfg.total_memories(limit);
79 }
80 if let Some(limit) = opts.pooling_total_tables {
81 cfg.total_tables(limit);
82 }
83 if let Some(limit) = opts.pooling_total_stacks {
84 cfg.total_stacks(limit);
85 }
86 if let Some(limit) = opts.pooling_max_memory_size {
87 cfg.max_memory_size(limit);
88 }
89 if let Some(enable) = opts.memory_protection_keys
90 && enable
91 {
92 cfg.memory_protection_keys(wasmtime::Enabled::Auto);
93 }
94 wasmtime::InstanceAllocationStrategy::Pooling(cfg)
95 }
96 }
97 }
98}
99#[derive(Clone, Debug)]
100pub struct EngineConfig {
101 pub pooling_config: PoolingConfig,
102 pub codegen_cache_dir: Option<Arc<Path>>,
103 pub consume_fuel: bool,
104 pub parallel_compilation: bool,
105 pub debug: bool,
106}
107
108impl EngineConfig {
109 #[cfg(any(test, feature = "test"))]
110 #[must_use]
111 pub fn on_demand_testing() -> Self {
112 let workspace_dir = std::path::PathBuf::from(
113 std::env::var("CARGO_WORKSPACE_DIR").expect("CARGO_WORKSPACE_DIR must be set"),
114 );
115 let codegen_cache = workspace_dir.join("test-codegen-cache");
116 Self {
117 pooling_config: PoolingConfig::OnDemand,
118 codegen_cache_dir: Some(Arc::from(codegen_cache)),
119 consume_fuel: false,
120 parallel_compilation: true,
121 debug: false,
122 }
123 }
124
125 #[cfg(test)]
126 pub(crate) fn pooling_nocache_testing(opts: PoolingOptions) -> Self {
127 Self {
128 pooling_config: PoolingConfig::Pooling(opts),
129 codegen_cache_dir: None,
130 consume_fuel: false,
131 parallel_compilation: true,
132 debug: false,
133 }
134 }
135}
136
137#[derive(Clone)]
138pub struct Engines {
139 pub activity_engine: Arc<Engine>,
140 pub webhook_engine: Arc<Engine>,
141 pub workflow_engine: Arc<Engine>,
142}
143
144impl Engines {
145 #[must_use]
146 pub fn weak_refs(&self) -> Vec<EngineWeak> {
147 vec![
148 self.activity_engine.weak(),
149 self.workflow_engine.weak(),
150 self.workflow_engine.weak(),
151 ]
152 }
153
154 fn configure_common(
155 mut dst_wasmtime_config: wasmtime::Config,
156 config: EngineConfig,
157 ) -> Result<Arc<Engine>, EngineError> {
158 dst_wasmtime_config.wasm_component_model(true);
159
160 dst_wasmtime_config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
161 dst_wasmtime_config.epoch_interruption(true);
162 dst_wasmtime_config.consume_fuel(config.consume_fuel);
163
164 if config.debug {
165 dst_wasmtime_config.debug_info(true);
166 dst_wasmtime_config.cranelift_opt_level(wasmtime::OptLevel::None);
167 }
168
169 dst_wasmtime_config.parallel_compilation(config.parallel_compilation);
170
171 dst_wasmtime_config.allocation_strategy(config.pooling_config.strategy());
172 if let Some(codegen_cache_dir) = config.codegen_cache_dir {
173 let mut cache_config = CacheConfig::new();
174 cache_config.with_directory(codegen_cache_dir.as_ref());
175 let cache =
176 Cache::new(cache_config).map_err(|err| EngineError::CodegenCache(err.into()))?;
177 dst_wasmtime_config.cache(Some(cache));
178 }
179 Engine::new(&dst_wasmtime_config)
180 .map(Arc::new)
181 .map_err(|err| EngineError::Uncategorized(err.into()))
182 }
183
184 pub(crate) fn get_webhook_engine(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
185 Self::configure_common(wasmtime::Config::new(), config)
186 }
187
188 #[cfg(any(test, feature = "test"))]
189 pub fn get_activity_engine_test(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
190 Self::get_activity_engine_internal(config)
191 }
192
193 fn get_activity_engine_internal(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
194 Self::configure_common(wasmtime::Config::new(), config)
195 }
196
197 #[cfg(any(test, feature = "test"))]
198 pub fn get_workflow_engine_test(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
199 Self::get_workflow_engine_internal(config)
200 }
201 fn get_workflow_engine_internal(config: EngineConfig) -> Result<Arc<Engine>, EngineError> {
202 let mut wasmtime_config = wasmtime::Config::new();
203 wasmtime_config.cranelift_nan_canonicalization(true);
206 wasmtime_config.relaxed_simd_deterministic(true);
207 Self::configure_common(wasmtime_config, config)
208 }
209
210 pub fn new(engine_config: EngineConfig) -> Result<Self, EngineError> {
211 let res: Result<_, EngineError> = (|engine_config: &EngineConfig| {
212 Ok(Engines {
213 activity_engine: Self::get_activity_engine_internal(engine_config.clone())?,
214 webhook_engine: Self::get_webhook_engine(engine_config.clone())?,
215 workflow_engine: Self::get_workflow_engine_internal(engine_config.clone())?,
216 })
217 })(&engine_config);
218
219 if let Err(err) = &res
220 && matches!(
221 engine_config.pooling_config,
222 PoolingConfig::PoolingWithFallback(_)
223 )
224 {
225 warn!("Falling back to on-demand allocator - {err}");
226 debug!("{err:?}");
227 let engine_config = EngineConfig {
228 pooling_config: PoolingConfig::OnDemand,
229 codegen_cache_dir: engine_config.codegen_cache_dir,
230 consume_fuel: engine_config.consume_fuel,
231 parallel_compilation: engine_config.parallel_compilation,
232 debug: engine_config.debug,
233 };
234 Self::new(engine_config)
235 } else {
236 res
237 }
238 }
239}