1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use blake3::Hasher;
9use greentic_interfaces_host::component::v0_5::exports::greentic::component::node;
10use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::GuestIndices;
11use greentic_interfaces_host::component_v0_6;
12use greentic_types::TenantCtx;
13use greentic_types::cbor::canonical;
14use serde_json::Value;
15use wasmtime::component::{Component, InstancePre, Linker};
16use wasmtime::{Config, Engine, Store};
17
18use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
19use crate::test_harness::secrets::InMemorySecretsStore;
20use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
21
22mod linker;
23mod secrets;
24mod state;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27enum ComponentAbi {
28 V0_5,
29 V0_6,
30}
31
32#[derive(Debug)]
33pub struct ComponentInvokeError {
34 pub code: String,
35 pub message: String,
36 pub retryable: bool,
37 pub backoff_ms: Option<u64>,
38 pub details: Option<String>,
39}
40
41impl std::fmt::Display for ComponentInvokeError {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 write!(f, "component error {}: {}", self.code, self.message)
44 }
45}
46
47impl std::error::Error for ComponentInvokeError {}
48
49#[derive(Debug)]
50pub enum HarnessError {
51 Timeout { timeout_ms: u64 },
52 MemoryLimit { max_memory_bytes: usize },
53}
54
55impl std::fmt::Display for HarnessError {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 HarnessError::Timeout { timeout_ms } => {
59 write!(f, "execution exceeded timeout of {timeout_ms}ms")
60 }
61 HarnessError::MemoryLimit { max_memory_bytes } => {
62 write!(
63 f,
64 "execution exceeded memory limit of {max_memory_bytes} bytes"
65 )
66 }
67 }
68 }
69}
70
71impl std::error::Error for HarnessError {}
72
73pub struct HarnessConfig {
74 pub wasm_bytes: Vec<u8>,
75 pub tenant_ctx: TenantCtx,
76 pub flow_id: String,
77 pub node_id: Option<String>,
78 pub state_prefix: String,
79 pub state_seeds: Vec<(String, Vec<u8>)>,
80 pub allow_state_read: bool,
81 pub allow_state_write: bool,
82 pub allow_state_delete: bool,
83 pub allow_secrets: bool,
84 pub allowed_secrets: HashSet<String>,
85 pub secrets: HashMap<String, String>,
86 pub wasi_preopens: Vec<WasiPreopen>,
87 pub config: Option<Value>,
88 pub allow_http: bool,
89 pub timeout_ms: u64,
90 pub max_memory_bytes: usize,
91}
92
93#[derive(Clone, Debug)]
94pub struct WasiPreopen {
95 pub host_path: PathBuf,
96 pub guest_path: String,
97 pub read_only: bool,
98}
99
100impl WasiPreopen {
101 pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
102 Self {
103 host_path: host_path.into(),
104 guest_path: guest_path.into(),
105 read_only: false,
106 }
107 }
108
109 pub fn read_only(mut self, value: bool) -> Self {
110 self.read_only = value;
111 self
112 }
113}
114
115pub struct TestHarness {
116 engine: Engine,
117 component: Component,
118 linker: Linker<HostState>,
119 instance_pre: InstancePre<HostState>,
120 guest_indices: Option<GuestIndices>,
121 abi: ComponentAbi,
122 state_store: Arc<InMemoryStateStore>,
123 secrets_store: Arc<InMemorySecretsStore>,
124 state_scope: StateScope,
125 allow_state_read: bool,
126 allow_state_write: bool,
127 allow_state_delete: bool,
128 tenant_ctx: TenantCtx,
129 exec_ctx: node::ExecCtx,
130 wasi_preopens: Vec<WasiPreopen>,
131 config_json: Option<String>,
132 allow_http: bool,
133 timeout_ms: u64,
134 max_memory_bytes: usize,
135 wasm_bytes_metadata: String,
136}
137
138pub struct InvokeOutcome {
139 pub output_json: String,
140 pub instantiate_ms: u64,
141 pub run_ms: u64,
142}
143
144impl TestHarness {
145 pub fn new(config: HarnessConfig) -> Result<Self> {
146 let mut wasmtime_config = Config::new();
147 wasmtime_config.wasm_component_model(true);
148 wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
149 wasmtime_config.epoch_interruption(true);
150 let engine = Engine::new(&wasmtime_config)
151 .map_err(|err| anyhow::anyhow!("create wasmtime engine: {err}"))?;
152
153 let component = Component::from_binary(&engine, &config.wasm_bytes)
154 .map_err(|err| anyhow::anyhow!("load component wasm: {err}"))?;
155 let wasm_bytes_metadata = describe_wasm_metadata(&config.wasm_bytes);
156 let abi = detect_component_abi(&config.wasm_bytes);
157
158 let linker = build_linker(&engine)?;
159 let instance_pre = linker.instantiate_pre(&component).map_err(|err| {
160 anyhow::anyhow!(
161 "prepare component instance (wasm metadata: {}): {}",
162 wasm_bytes_metadata,
163 err
164 )
165 })?;
166 let guest_indices = if abi == ComponentAbi::V0_5 {
167 Some(GuestIndices::new(&instance_pre).map_err(|err| {
168 anyhow::anyhow!(
169 "load guest indices (wasm metadata: {}): {}",
170 wasm_bytes_metadata,
171 err
172 )
173 })?)
174 } else {
175 None
176 };
177
178 let state_store = Arc::new(InMemoryStateStore::new());
179 let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
180 let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
181 let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
182 for (key, value) in config.state_seeds {
183 state_store.write(&scope, &key, value);
184 }
185
186 let exec_ctx = node::ExecCtx {
187 tenant: make_component_tenant_ctx(&config.tenant_ctx),
188 i18n_id: config.tenant_ctx.i18n_id.clone(),
189 flow_id: config.flow_id,
190 node_id: config.node_id,
191 };
192
193 let config_json = match config.config {
194 Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
195 None => None,
196 };
197
198 Ok(Self {
199 engine,
200 component,
201 linker,
202 instance_pre,
203 guest_indices,
204 abi,
205 state_store,
206 secrets_store,
207 state_scope: scope,
208 allow_state_read: config.allow_state_read,
209 allow_state_write: config.allow_state_write,
210 allow_state_delete: config.allow_state_delete,
211 tenant_ctx: config.tenant_ctx,
212 exec_ctx,
213 wasi_preopens: config.wasi_preopens,
214 config_json,
215 allow_http: config.allow_http,
216 timeout_ms: config.timeout_ms,
217 max_memory_bytes: config.max_memory_bytes,
218 wasm_bytes_metadata,
219 })
220 }
221
222 pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
223 let host_state = HostState::new(HostStateConfig {
224 base_scope: self.state_scope.clone(),
225 state_store: self.state_store.clone(),
226 secrets: self.secrets_store.clone(),
227 allow_state_read: self.allow_state_read,
228 allow_state_write: self.allow_state_write,
229 allow_state_delete: self.allow_state_delete,
230 wasi_preopens: self.wasi_preopens.clone(),
231 allow_http: self.allow_http,
232 config_json: self.config_json.clone(),
233 max_memory_bytes: self.max_memory_bytes,
234 })
235 .context("build WASI context")?;
236 let mut store = Store::new(&self.engine, host_state);
237 store.limiter(|state| state.limits_mut());
238 store.set_epoch_deadline(1);
239
240 let done = Arc::new(AtomicBool::new(false));
241 let _timeout_guard = TimeoutGuard::new(done.clone());
242 let engine = self.engine.clone();
243 let timeout_ms = self.timeout_ms;
244 std::thread::spawn(move || {
245 std::thread::sleep(Duration::from_millis(timeout_ms));
246 if !done.load(Ordering::Relaxed) {
247 engine.increment_epoch();
248 }
249 });
250
251 let instantiate_start = Instant::now();
252 match self.abi {
253 ComponentAbi::V0_5 => {
254 let guest_indices = self
255 .guest_indices
256 .as_ref()
257 .context("missing v0.5 guest indices")?;
258 let instance = self
259 .instance_pre
260 .instantiate(&mut store)
261 .map_err(|err| anyhow::anyhow!("instantiate component: {err}"))
262 .and_then(|instance| {
263 guest_indices
264 .load(&mut store, &instance)
265 .map_err(|err| anyhow::anyhow!("load component exports: {err}"))
266 .map(|exports| (instance, exports))
267 })
268 .with_context(|| {
269 format!(
270 "failed to prepare component instance (wasm metadata: {})",
271 self.wasm_bytes_metadata
272 )
273 });
274
275 let (_instance, exports) = match instance {
276 Ok(value) => value,
277 Err(err) => {
278 return map_invoke_error(
279 err,
280 &store,
281 self.timeout_ms,
282 self.max_memory_bytes,
283 );
284 }
285 };
286 let instantiate_ms = duration_ms(instantiate_start.elapsed());
287
288 let input = serde_json::to_string(input_json).context("serialize input json")?;
289 let run_start = Instant::now();
290 let result = exports
291 .call_invoke(&mut store, &self.exec_ctx, operation, &input)
292 .map_err(|err| anyhow::anyhow!("invoke component: {err}"));
293
294 use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::InvokeResult;
295
296 let result = match result {
297 Ok(result) => result,
298 Err(err) => {
299 return map_invoke_error(
300 err,
301 &store,
302 self.timeout_ms,
303 self.max_memory_bytes,
304 );
305 }
306 };
307 let run_ms = duration_ms(run_start.elapsed());
308
309 match result {
310 InvokeResult::Ok(output_json) => Ok(InvokeOutcome {
311 output_json,
312 instantiate_ms,
313 run_ms,
314 }),
315 InvokeResult::Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
316 code: err.code,
317 message: err.message,
318 retryable: err.retryable,
319 backoff_ms: err.backoff_ms,
320 details: err.details,
321 })),
322 }
323 }
324 ComponentAbi::V0_6 => {
325 let exports = component_v0_6::ComponentV0V6V0::instantiate(
326 &mut store,
327 &self.component,
328 &self.linker,
329 )
330 .map_err(|err| anyhow::anyhow!("instantiate component: {err}"))
331 .with_context(|| {
332 format!(
333 "failed to prepare component instance (wasm metadata: {})",
334 self.wasm_bytes_metadata
335 )
336 });
337 let exports = match exports {
338 Ok(value) => value,
339 Err(err) => {
340 return map_invoke_error(
341 err,
342 &store,
343 self.timeout_ms,
344 self.max_memory_bytes,
345 );
346 }
347 };
348 let instantiate_ms = duration_ms(instantiate_start.elapsed());
349
350 let mut payload = input_json.clone();
351 if !payload.is_object() {
352 payload = serde_json::json!({ "input": payload });
353 }
354 if let Some(object) = payload.as_object_mut()
355 && !object.contains_key("operation")
356 {
357 object.insert(
358 "operation".to_string(),
359 Value::String(operation.to_string()),
360 );
361 }
362
363 let input = canonical::to_canonical_cbor_allow_floats(&payload)
364 .context("encode invoke payload to cbor")?;
365 let invoke_envelope =
366 component_v0_6::exports::greentic::component::node::InvocationEnvelope {
367 ctx: make_component_tenant_ctx_v0_6(&self.tenant_ctx),
368 flow_id: self.exec_ctx.flow_id.clone(),
369 step_id: self
370 .exec_ctx
371 .node_id
372 .clone()
373 .unwrap_or_else(|| operation.to_string()),
374 component_id: self
375 .exec_ctx
376 .node_id
377 .clone()
378 .unwrap_or_else(|| "component".to_string()),
379 attempt: self.tenant_ctx.attempt,
380 payload_cbor: input,
381 metadata_cbor: None,
382 };
383
384 let run_start = Instant::now();
385 let result = exports
386 .greentic_component_node()
387 .call_invoke(&mut store, operation, &invoke_envelope)
388 .map_err(|err| anyhow::anyhow!("invoke component: {err}"));
389 let result = match result {
390 Ok(value) => value,
391 Err(err) => {
392 return map_invoke_error(
393 err,
394 &store,
395 self.timeout_ms,
396 self.max_memory_bytes,
397 );
398 }
399 };
400 let run_ms = duration_ms(run_start.elapsed());
401 match result {
402 Ok(result) => {
403 let output_value: Value = canonical::from_cbor(&result.output_cbor)
404 .context("decode invoke output cbor")?;
405 let output_json = serde_json::to_string(&output_value)
406 .context("serialize invoke output json")?;
407 Ok(InvokeOutcome {
408 output_json,
409 instantiate_ms,
410 run_ms,
411 })
412 }
413 Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
414 code: err.code,
415 message: err.message,
416 retryable: err.retryable,
417 backoff_ms: err.backoff_ms,
418 details: err
419 .details
420 .as_ref()
421 .and_then(|bytes| canonical::from_cbor::<Value>(bytes).ok())
422 .and_then(|value| serde_json::to_string(&value).ok()),
423 })),
424 }
425 }
426 }
427 }
428
429 pub fn state_dump(&self) -> Vec<StateDumpEntry> {
430 self.state_store.dump()
431 }
432}
433
434fn make_component_tenant_ctx(tenant: &TenantCtx) -> node::TenantCtx {
435 node::TenantCtx {
436 tenant: tenant.tenant.as_str().to_string(),
437 team: tenant.team.as_ref().map(|t| t.as_str().to_string()),
438 user: tenant.user.as_ref().map(|u| u.as_str().to_string()),
439 trace_id: tenant.trace_id.clone(),
440 i18n_id: tenant.i18n_id.clone(),
441 correlation_id: tenant.correlation_id.clone(),
442 deadline_unix_ms: tenant
443 .deadline
444 .and_then(|deadline| u64::try_from(deadline.unix_millis()).ok()),
445 attempt: tenant.attempt,
446 idempotency_key: tenant.idempotency_key.clone(),
447 }
448}
449
450fn make_component_tenant_ctx_v0_6(
451 tenant: &TenantCtx,
452) -> component_v0_6::exports::greentic::component::node::TenantCtx {
453 component_v0_6::exports::greentic::component::node::TenantCtx {
454 tenant_id: tenant.tenant_id.as_str().to_string(),
455 team_id: tenant.team_id.as_ref().map(|t| t.as_str().to_string()),
456 user_id: tenant.user_id.as_ref().map(|u| u.as_str().to_string()),
457 env_id: tenant.env.as_str().to_string(),
458 trace_id: tenant
459 .trace_id
460 .clone()
461 .unwrap_or_else(|| "trace-local".to_string()),
462 correlation_id: tenant
463 .correlation_id
464 .clone()
465 .unwrap_or_else(|| "corr-local".to_string()),
466 deadline_ms: tenant
467 .deadline
468 .and_then(|deadline| u64::try_from(deadline.unix_millis()).ok())
469 .unwrap_or(0),
470 attempt: tenant.attempt,
471 idempotency_key: tenant.idempotency_key.clone(),
472 i18n_id: tenant.i18n_id.clone().unwrap_or_else(|| "en".to_string()),
473 }
474}
475
476struct TimeoutGuard {
477 done: Arc<AtomicBool>,
478}
479
480impl TimeoutGuard {
481 fn new(done: Arc<AtomicBool>) -> Self {
482 Self { done }
483 }
484}
485
486impl Drop for TimeoutGuard {
487 fn drop(&mut self) {
488 self.done.store(true, Ordering::Relaxed);
489 }
490}
491
492fn is_timeout_error(err: &anyhow::Error) -> bool {
493 err.chain()
494 .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
495 .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
496}
497
498fn duration_ms(duration: Duration) -> u64 {
499 duration.as_millis().try_into().unwrap_or(u64::MAX)
500}
501
502fn map_invoke_error(
503 err: anyhow::Error,
504 store: &Store<HostState>,
505 timeout_ms: u64,
506 max_memory_bytes: usize,
507) -> Result<InvokeOutcome> {
508 if is_timeout_error(&err) {
509 return Err(anyhow::Error::new(HarnessError::Timeout { timeout_ms }));
510 }
511 if store.data().memory_limit_hit() {
512 return Err(anyhow::Error::new(HarnessError::MemoryLimit {
513 max_memory_bytes,
514 }));
515 }
516 Err(err)
517}
518
519fn detect_component_abi(bytes: &[u8]) -> ComponentAbi {
520 if let Ok(decoded) = crate::wasm::decode_world(bytes) {
521 let world = &decoded.resolve.worlds[decoded.world];
522 if world.name == "component-v0-v5-v0" {
523 return ComponentAbi::V0_5;
524 }
525 }
526 ComponentAbi::V0_6
527}
528
529fn describe_wasm_metadata(bytes: &[u8]) -> String {
530 let mut hasher = Hasher::new();
531 hasher.update(bytes);
532 format!("len={}, blake3:{}", bytes.len(), hasher.finalize().to_hex())
533}