greentic_component/test_harness/
mod.rs1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use blake3::Hasher;
9use greentic_interfaces_host::component::v0_5::exports::greentic::component::node;
10use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::GuestIndices;
11use greentic_types::TenantCtx;
12use serde_json::Value;
13use wasmtime::component::{Component, InstancePre};
14use wasmtime::{Config, Engine, Store};
15
16use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
17use crate::test_harness::secrets::InMemorySecretsStore;
18use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
19
20mod linker;
21mod secrets;
22mod state;
23
24#[derive(Debug)]
25pub struct ComponentInvokeError {
26 pub code: String,
27 pub message: String,
28 pub retryable: bool,
29 pub backoff_ms: Option<u64>,
30 pub details: Option<String>,
31}
32
33impl std::fmt::Display for ComponentInvokeError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "component error {}: {}", self.code, self.message)
36 }
37}
38
39impl std::error::Error for ComponentInvokeError {}
40
41#[derive(Debug)]
42pub enum HarnessError {
43 Timeout { timeout_ms: u64 },
44 MemoryLimit { max_memory_bytes: usize },
45}
46
47impl std::fmt::Display for HarnessError {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 HarnessError::Timeout { timeout_ms } => {
51 write!(f, "execution exceeded timeout of {timeout_ms}ms")
52 }
53 HarnessError::MemoryLimit { max_memory_bytes } => {
54 write!(
55 f,
56 "execution exceeded memory limit of {max_memory_bytes} bytes"
57 )
58 }
59 }
60 }
61}
62
63impl std::error::Error for HarnessError {}
64
65pub struct HarnessConfig {
66 pub wasm_bytes: Vec<u8>,
67 pub tenant_ctx: TenantCtx,
68 pub flow_id: String,
69 pub node_id: Option<String>,
70 pub state_prefix: String,
71 pub state_seeds: Vec<(String, Vec<u8>)>,
72 pub allow_state_read: bool,
73 pub allow_state_write: bool,
74 pub allow_state_delete: bool,
75 pub allow_secrets: bool,
76 pub allowed_secrets: HashSet<String>,
77 pub secrets: HashMap<String, String>,
78 pub wasi_preopens: Vec<WasiPreopen>,
79 pub config: Option<Value>,
80 pub allow_http: bool,
81 pub timeout_ms: u64,
82 pub max_memory_bytes: usize,
83}
84
85#[derive(Clone, Debug)]
86pub struct WasiPreopen {
87 pub host_path: PathBuf,
88 pub guest_path: String,
89 pub read_only: bool,
90}
91
92impl WasiPreopen {
93 pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
94 Self {
95 host_path: host_path.into(),
96 guest_path: guest_path.into(),
97 read_only: false,
98 }
99 }
100
101 pub fn read_only(mut self, value: bool) -> Self {
102 self.read_only = value;
103 self
104 }
105}
106
107pub struct TestHarness {
108 engine: Engine,
109 instance_pre: InstancePre<HostState>,
110 guest_indices: GuestIndices,
111 state_store: Arc<InMemoryStateStore>,
112 secrets_store: Arc<InMemorySecretsStore>,
113 state_scope: StateScope,
114 allow_state_read: bool,
115 allow_state_write: bool,
116 allow_state_delete: bool,
117 exec_ctx: node::ExecCtx,
118 wasi_preopens: Vec<WasiPreopen>,
119 config_json: Option<String>,
120 allow_http: bool,
121 timeout_ms: u64,
122 max_memory_bytes: usize,
123 wasm_bytes_metadata: String,
124}
125
126pub struct InvokeOutcome {
127 pub output_json: String,
128 pub instantiate_ms: u64,
129 pub run_ms: u64,
130}
131
132impl TestHarness {
133 pub fn new(config: HarnessConfig) -> Result<Self> {
134 let mut wasmtime_config = Config::new();
135 wasmtime_config.wasm_component_model(true);
136 wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
137 wasmtime_config.epoch_interruption(true);
138 let engine = Engine::new(&wasmtime_config).context("create wasmtime engine")?;
139
140 let component =
141 Component::from_binary(&engine, &config.wasm_bytes).context("load component wasm")?;
142 let wasm_bytes_metadata = describe_wasm_metadata(&config.wasm_bytes);
143
144 let linker = build_linker(&engine)?;
145 let instance_pre = linker
146 .instantiate_pre(&component)
147 .map_err(|err| {
148 eprintln!(
149 "Linker::instantiate_pre failed ({}): {err}",
150 wasm_bytes_metadata
151 );
152 for source in err.chain().skip(1) {
153 eprintln!(" cause: {source}");
154 }
155 err
156 })
157 .with_context(|| {
158 format!(
159 "prepare component instance (wasm metadata: {})",
160 wasm_bytes_metadata
161 )
162 })?;
163 let guest_indices = GuestIndices::new(&instance_pre)
164 .map_err(|err| {
165 eprintln!("GuestIndices::new failed ({}): {err}", wasm_bytes_metadata);
166 for source in err.chain().skip(1) {
167 eprintln!(" cause: {source}");
168 }
169 err
170 })
171 .with_context(|| {
172 format!(
173 "load guest indices (wasm metadata: {})",
174 wasm_bytes_metadata
175 )
176 })?;
177
178 let state_store = Arc::new(InMemoryStateStore::new());
179 let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
180 let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
181 let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
182 for (key, value) in config.state_seeds {
183 state_store.write(&scope, &key, value);
184 }
185
186 let exec_ctx = node::ExecCtx {
187 tenant: make_component_tenant_ctx(&config.tenant_ctx),
188 i18n_id: config.tenant_ctx.i18n_id.clone(),
189 flow_id: config.flow_id,
190 node_id: config.node_id,
191 };
192
193 let config_json = match config.config {
194 Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
195 None => None,
196 };
197
198 Ok(Self {
199 engine,
200 instance_pre,
201 guest_indices,
202 state_store,
203 secrets_store,
204 state_scope: scope,
205 allow_state_read: config.allow_state_read,
206 allow_state_write: config.allow_state_write,
207 allow_state_delete: config.allow_state_delete,
208 exec_ctx,
209 wasi_preopens: config.wasi_preopens,
210 config_json,
211 allow_http: config.allow_http,
212 timeout_ms: config.timeout_ms,
213 max_memory_bytes: config.max_memory_bytes,
214 wasm_bytes_metadata,
215 })
216 }
217
218 pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
219 let host_state = HostState::new(HostStateConfig {
220 base_scope: self.state_scope.clone(),
221 state_store: self.state_store.clone(),
222 secrets: self.secrets_store.clone(),
223 allow_state_read: self.allow_state_read,
224 allow_state_write: self.allow_state_write,
225 allow_state_delete: self.allow_state_delete,
226 wasi_preopens: self.wasi_preopens.clone(),
227 allow_http: self.allow_http,
228 config_json: self.config_json.clone(),
229 max_memory_bytes: self.max_memory_bytes,
230 })
231 .context("build WASI context")?;
232 let mut store = Store::new(&self.engine, host_state);
233 store.limiter(|state| state.limits_mut());
234 store.set_epoch_deadline(1);
235
236 let done = Arc::new(AtomicBool::new(false));
237 let _timeout_guard = TimeoutGuard::new(done.clone());
238 let engine = self.engine.clone();
239 let timeout_ms = self.timeout_ms;
240 std::thread::spawn(move || {
241 std::thread::sleep(Duration::from_millis(timeout_ms));
242 if !done.load(Ordering::Relaxed) {
243 engine.increment_epoch();
244 }
245 });
246
247 let instantiate_start = Instant::now();
248 let instance = self
249 .instance_pre
250 .instantiate(&mut store)
251 .context("instantiate component")
252 .and_then(|instance| {
253 self.guest_indices
254 .load(&mut store, &instance)
255 .context("load component exports")
256 .map(|exports| (instance, exports))
257 })
258 .with_context(|| {
259 format!(
260 "failed to prepare component instance (wasm metadata: {})",
261 self.wasm_bytes_metadata
262 )
263 });
264
265 let (_instance, exports) = match instance {
266 Ok(value) => value,
267 Err(err) => {
268 if is_timeout_error(&err) {
269 return Err(anyhow::Error::new(HarnessError::Timeout {
270 timeout_ms: self.timeout_ms,
271 }));
272 }
273 if store.data().memory_limit_hit() {
274 return Err(anyhow::Error::new(HarnessError::MemoryLimit {
275 max_memory_bytes: self.max_memory_bytes,
276 }));
277 }
278 return Err(err);
279 }
280 };
281 let instantiate_ms = duration_ms(instantiate_start.elapsed());
282
283 let input = serde_json::to_string(input_json).context("serialize input json")?;
284 let run_start = Instant::now();
285 let result = exports
286 .call_invoke(&mut store, &self.exec_ctx, operation, &input)
287 .context("invoke component");
288
289 use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::InvokeResult;
290
291 let result = match result {
292 Ok(result) => result,
293 Err(err) => {
294 if is_timeout_error(&err) {
295 return Err(anyhow::Error::new(HarnessError::Timeout {
296 timeout_ms: self.timeout_ms,
297 }));
298 }
299 if store.data().memory_limit_hit() {
300 return Err(anyhow::Error::new(HarnessError::MemoryLimit {
301 max_memory_bytes: self.max_memory_bytes,
302 }));
303 }
304 return Err(err);
305 }
306 };
307 let run_ms = duration_ms(run_start.elapsed());
308
309 match result {
310 InvokeResult::Ok(output_json) => Ok(InvokeOutcome {
311 output_json,
312 instantiate_ms,
313 run_ms,
314 }),
315 InvokeResult::Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
316 code: err.code,
317 message: err.message,
318 retryable: err.retryable,
319 backoff_ms: err.backoff_ms,
320 details: err.details,
321 })),
322 }
323 }
324
325 pub fn state_dump(&self) -> Vec<StateDumpEntry> {
326 self.state_store.dump()
327 }
328}
329
330fn make_component_tenant_ctx(tenant: &TenantCtx) -> node::TenantCtx {
331 node::TenantCtx {
332 tenant: tenant.tenant.as_str().to_string(),
333 team: tenant.team.as_ref().map(|t| t.as_str().to_string()),
334 user: tenant.user.as_ref().map(|u| u.as_str().to_string()),
335 trace_id: tenant.trace_id.clone(),
336 i18n_id: tenant.i18n_id.clone(),
337 correlation_id: tenant.correlation_id.clone(),
338 deadline_unix_ms: tenant.deadline.and_then(|deadline| {
339 let millis = deadline.unix_millis();
340 if millis >= 0 {
341 u64::try_from(millis).ok()
342 } else {
343 None
344 }
345 }),
346 attempt: tenant.attempt,
347 idempotency_key: tenant.idempotency_key.clone(),
348 }
349}
350
351struct TimeoutGuard {
352 done: Arc<AtomicBool>,
353}
354
355impl TimeoutGuard {
356 fn new(done: Arc<AtomicBool>) -> Self {
357 Self { done }
358 }
359}
360
361impl Drop for TimeoutGuard {
362 fn drop(&mut self) {
363 self.done.store(true, Ordering::Relaxed);
364 }
365}
366
367fn is_timeout_error(err: &anyhow::Error) -> bool {
368 err.chain()
369 .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
370 .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
371}
372
373fn duration_ms(duration: Duration) -> u64 {
374 duration.as_millis().try_into().unwrap_or(u64::MAX)
375}
376
377fn describe_wasm_metadata(bytes: &[u8]) -> String {
378 let mut hasher = Hasher::new();
379 hasher.update(bytes);
380 format!("len={}, blake3:{}", bytes.len(), hasher.finalize().to_hex())
381}