1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use blake3::Hasher;
9use greentic_interfaces_host::component_v0_6;
10use greentic_types::TenantCtx;
11use greentic_types::cbor::canonical;
12use serde_json::Value;
13use wasmtime::component::{Component, Linker};
14use wasmtime::{Config, Engine, Store};
15
16use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
17use crate::test_harness::secrets::InMemorySecretsStore;
18use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
19
20mod linker;
21mod secrets;
22mod state;
23
24#[derive(Debug)]
25pub struct ComponentInvokeError {
26 pub code: String,
27 pub message: String,
28 pub retryable: bool,
29 pub backoff_ms: Option<u64>,
30 pub details: Option<String>,
31}
32
33impl std::fmt::Display for ComponentInvokeError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "component error {}: {}", self.code, self.message)
36 }
37}
38
39impl std::error::Error for ComponentInvokeError {}
40
41#[derive(Debug)]
42pub enum HarnessError {
43 Timeout { timeout_ms: u64 },
44 MemoryLimit { max_memory_bytes: usize },
45}
46
47impl std::fmt::Display for HarnessError {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 HarnessError::Timeout { timeout_ms } => {
51 write!(f, "execution exceeded timeout of {timeout_ms}ms")
52 }
53 HarnessError::MemoryLimit { max_memory_bytes } => {
54 write!(
55 f,
56 "execution exceeded memory limit of {max_memory_bytes} bytes"
57 )
58 }
59 }
60 }
61}
62
63impl std::error::Error for HarnessError {}
64
65pub struct HarnessConfig {
66 pub wasm_bytes: Vec<u8>,
67 pub tenant_ctx: TenantCtx,
68 pub flow_id: String,
69 pub node_id: Option<String>,
70 pub state_prefix: String,
71 pub state_seeds: Vec<(String, Vec<u8>)>,
72 pub allow_state_read: bool,
73 pub allow_state_write: bool,
74 pub allow_state_delete: bool,
75 pub allow_secrets: bool,
76 pub allowed_secrets: HashSet<String>,
77 pub secrets: HashMap<String, String>,
78 pub wasi_preopens: Vec<WasiPreopen>,
79 pub config: Option<Value>,
80 pub allow_http: bool,
81 pub timeout_ms: u64,
82 pub max_memory_bytes: usize,
83}
84
85#[derive(Clone, Debug)]
86pub struct WasiPreopen {
87 pub host_path: PathBuf,
88 pub guest_path: String,
89 pub read_only: bool,
90}
91
92impl WasiPreopen {
93 pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
94 Self {
95 host_path: host_path.into(),
96 guest_path: guest_path.into(),
97 read_only: false,
98 }
99 }
100
101 pub fn read_only(mut self, value: bool) -> Self {
102 self.read_only = value;
103 self
104 }
105}
106
107pub struct TestHarness {
108 engine: Engine,
109 component: Component,
110 linker: Linker<HostState>,
111 state_store: Arc<InMemoryStateStore>,
112 secrets_store: Arc<InMemorySecretsStore>,
113 state_scope: StateScope,
114 allow_state_read: bool,
115 allow_state_write: bool,
116 allow_state_delete: bool,
117 tenant_ctx: TenantCtx,
118 flow_id: String,
119 node_id: Option<String>,
120 wasi_preopens: Vec<WasiPreopen>,
121 config_json: Option<String>,
122 allow_http: bool,
123 timeout_ms: u64,
124 max_memory_bytes: usize,
125 wasm_bytes_metadata: String,
126}
127
128pub struct InvokeOutcome {
129 pub output_json: String,
130 pub instantiate_ms: u64,
131 pub run_ms: u64,
132}
133
134impl TestHarness {
135 pub fn new(config: HarnessConfig) -> Result<Self> {
136 let mut wasmtime_config = Config::new();
137 wasmtime_config.wasm_component_model(true);
138 wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
139 wasmtime_config.epoch_interruption(true);
140 let engine = Engine::new(&wasmtime_config)
141 .map_err(|err| anyhow::anyhow!("create wasmtime engine: {err}"))?;
142
143 let component = Component::from_binary(&engine, &config.wasm_bytes)
144 .map_err(|err| anyhow::anyhow!("load component wasm: {err}"))?;
145 let wasm_bytes_metadata = describe_wasm_metadata(&config.wasm_bytes);
146
147 let linker = build_linker(&engine)?;
148 linker.instantiate_pre(&component).map_err(|err| {
149 anyhow::anyhow!(
150 "prepare component instance (wasm metadata: {}): {}",
151 wasm_bytes_metadata,
152 err
153 )
154 })?;
155
156 let state_store = Arc::new(InMemoryStateStore::new());
157 let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
158 let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
159 let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
160 for (key, value) in config.state_seeds {
161 state_store.write(&scope, &key, value);
162 }
163
164 let config_json = match config.config {
165 Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
166 None => None,
167 };
168
169 Ok(Self {
170 engine,
171 component,
172 linker,
173 state_store,
174 secrets_store,
175 state_scope: scope,
176 allow_state_read: config.allow_state_read,
177 allow_state_write: config.allow_state_write,
178 allow_state_delete: config.allow_state_delete,
179 tenant_ctx: config.tenant_ctx,
180 flow_id: config.flow_id,
181 node_id: config.node_id,
182 wasi_preopens: config.wasi_preopens,
183 config_json,
184 allow_http: config.allow_http,
185 timeout_ms: config.timeout_ms,
186 max_memory_bytes: config.max_memory_bytes,
187 wasm_bytes_metadata,
188 })
189 }
190
191 pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
192 let host_state = HostState::new(HostStateConfig {
193 base_scope: self.state_scope.clone(),
194 state_store: self.state_store.clone(),
195 secrets: self.secrets_store.clone(),
196 allow_state_read: self.allow_state_read,
197 allow_state_write: self.allow_state_write,
198 allow_state_delete: self.allow_state_delete,
199 wasi_preopens: self.wasi_preopens.clone(),
200 allow_http: self.allow_http,
201 config_json: self.config_json.clone(),
202 max_memory_bytes: self.max_memory_bytes,
203 })
204 .context("build WASI context")?;
205 let mut store = Store::new(&self.engine, host_state);
206 store.limiter(|state| state.limits_mut());
207 store.set_epoch_deadline(1);
208
209 let done = Arc::new(AtomicBool::new(false));
210 let _timeout_guard = TimeoutGuard::new(done.clone());
211 let engine = self.engine.clone();
212 let timeout_ms = self.timeout_ms;
213 std::thread::spawn(move || {
214 std::thread::sleep(Duration::from_millis(timeout_ms));
215 if !done.load(Ordering::Relaxed) {
216 engine.increment_epoch();
217 }
218 });
219
220 let instantiate_start = Instant::now();
221 let exports =
222 component_v0_6::ComponentV0V6V0::instantiate(&mut store, &self.component, &self.linker)
223 .map_err(|err| anyhow::anyhow!("instantiate component: {err}"))
224 .with_context(|| {
225 format!(
226 "failed to prepare component instance (wasm metadata: {})",
227 self.wasm_bytes_metadata
228 )
229 });
230 let exports = match exports {
231 Ok(value) => value,
232 Err(err) => {
233 return map_invoke_error(err, &store, self.timeout_ms, self.max_memory_bytes);
234 }
235 };
236 let instantiate_ms = duration_ms(instantiate_start.elapsed());
237
238 let mut payload = input_json.clone();
239 if !payload.is_object() {
240 payload = serde_json::json!({ "input": payload });
241 }
242 if let Some(object) = payload.as_object_mut()
243 && !object.contains_key("operation")
244 {
245 object.insert(
246 "operation".to_string(),
247 Value::String(operation.to_string()),
248 );
249 }
250
251 let input = canonical::to_canonical_cbor_allow_floats(&payload)
252 .context("encode invoke payload to cbor")?;
253 let invoke_envelope =
254 component_v0_6::exports::greentic::component::node::InvocationEnvelope {
255 ctx: make_component_tenant_ctx_v0_6(&self.tenant_ctx),
256 flow_id: self.flow_id.clone(),
257 step_id: self
258 .node_id
259 .clone()
260 .unwrap_or_else(|| operation.to_string()),
261 component_id: self
262 .node_id
263 .clone()
264 .unwrap_or_else(|| "component".to_string()),
265 attempt: self.tenant_ctx.attempt,
266 payload_cbor: input,
267 metadata_cbor: None,
268 };
269
270 let run_start = Instant::now();
271 let result = exports
272 .greentic_component_node()
273 .call_invoke(&mut store, operation, &invoke_envelope)
274 .map_err(|err| anyhow::anyhow!("invoke component: {err}"));
275 let result = match result {
276 Ok(value) => value,
277 Err(err) => {
278 return map_invoke_error(err, &store, self.timeout_ms, self.max_memory_bytes);
279 }
280 };
281 let run_ms = duration_ms(run_start.elapsed());
282 match result {
283 Ok(result) => {
284 let output_value: Value = canonical::from_cbor(&result.output_cbor)
285 .context("decode invoke output cbor")?;
286 let output_json =
287 serde_json::to_string(&output_value).context("serialize invoke output json")?;
288 Ok(InvokeOutcome {
289 output_json,
290 instantiate_ms,
291 run_ms,
292 })
293 }
294 Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
295 code: err.code,
296 message: err.message,
297 retryable: err.retryable,
298 backoff_ms: err.backoff_ms,
299 details: err
300 .details
301 .as_ref()
302 .and_then(|bytes| canonical::from_cbor::<Value>(bytes).ok())
303 .and_then(|value| serde_json::to_string(&value).ok()),
304 })),
305 }
306 }
307
308 pub fn state_dump(&self) -> Vec<StateDumpEntry> {
309 self.state_store.dump()
310 }
311}
312
313fn make_component_tenant_ctx_v0_6(
314 tenant: &TenantCtx,
315) -> component_v0_6::exports::greentic::component::node::TenantCtx {
316 component_v0_6::exports::greentic::component::node::TenantCtx {
317 tenant_id: tenant.tenant_id.as_str().to_string(),
318 team_id: tenant.team_id.as_ref().map(|t| t.as_str().to_string()),
319 user_id: tenant.user_id.as_ref().map(|u| u.as_str().to_string()),
320 env_id: tenant.env.as_str().to_string(),
321 trace_id: tenant
322 .trace_id
323 .clone()
324 .unwrap_or_else(|| "trace-local".to_string()),
325 correlation_id: tenant
326 .correlation_id
327 .clone()
328 .unwrap_or_else(|| "corr-local".to_string()),
329 deadline_ms: tenant
330 .deadline
331 .and_then(|deadline| u64::try_from(deadline.unix_millis()).ok())
332 .unwrap_or(0),
333 attempt: tenant.attempt,
334 idempotency_key: tenant.idempotency_key.clone(),
335 i18n_id: tenant.i18n_id.clone().unwrap_or_else(|| "en".to_string()),
336 }
337}
338
339struct TimeoutGuard {
340 done: Arc<AtomicBool>,
341}
342
343impl TimeoutGuard {
344 fn new(done: Arc<AtomicBool>) -> Self {
345 Self { done }
346 }
347}
348
349impl Drop for TimeoutGuard {
350 fn drop(&mut self) {
351 self.done.store(true, Ordering::Relaxed);
352 }
353}
354
355fn is_timeout_error(err: &anyhow::Error) -> bool {
356 err.chain()
357 .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
358 .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
359}
360
361fn duration_ms(duration: Duration) -> u64 {
362 duration.as_millis().try_into().unwrap_or(u64::MAX)
363}
364
365fn map_invoke_error(
366 err: anyhow::Error,
367 store: &Store<HostState>,
368 timeout_ms: u64,
369 max_memory_bytes: usize,
370) -> Result<InvokeOutcome> {
371 if is_timeout_error(&err) {
372 return Err(anyhow::Error::new(HarnessError::Timeout { timeout_ms }));
373 }
374 if store.data().memory_limit_hit() {
375 return Err(anyhow::Error::new(HarnessError::MemoryLimit {
376 max_memory_bytes,
377 }));
378 }
379 Err(err)
380}
381
382fn describe_wasm_metadata(bytes: &[u8]) -> String {
383 let mut hasher = Hasher::new();
384 hasher.update(bytes);
385 format!("len={}, blake3:{}", bytes.len(), hasher.finalize().to_hex())
386}