greentic_component/test_harness/
mod.rs1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use greentic_interfaces_host::component::v0_5::exports::greentic::component::node;
9use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::GuestIndices;
10use greentic_types::TenantCtx;
11use serde_json::Value;
12use wasmtime::component::{Component, InstancePre};
13use wasmtime::{Config, Engine, Store};
14
15use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
16use crate::test_harness::secrets::InMemorySecretsStore;
17use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
18
19mod linker;
20mod secrets;
21mod state;
22
23#[derive(Debug)]
24pub struct ComponentInvokeError {
25 pub code: String,
26 pub message: String,
27 pub retryable: bool,
28 pub backoff_ms: Option<u64>,
29 pub details: Option<String>,
30}
31
32impl std::fmt::Display for ComponentInvokeError {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "component error {}: {}", self.code, self.message)
35 }
36}
37
38impl std::error::Error for ComponentInvokeError {}
39
40#[derive(Debug)]
41pub enum HarnessError {
42 Timeout { timeout_ms: u64 },
43 MemoryLimit { max_memory_bytes: usize },
44}
45
46impl std::fmt::Display for HarnessError {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 HarnessError::Timeout { timeout_ms } => {
50 write!(f, "execution exceeded timeout of {timeout_ms}ms")
51 }
52 HarnessError::MemoryLimit { max_memory_bytes } => {
53 write!(
54 f,
55 "execution exceeded memory limit of {max_memory_bytes} bytes"
56 )
57 }
58 }
59 }
60}
61
62impl std::error::Error for HarnessError {}
63
64pub struct HarnessConfig {
65 pub wasm_bytes: Vec<u8>,
66 pub tenant_ctx: TenantCtx,
67 pub flow_id: String,
68 pub node_id: Option<String>,
69 pub state_prefix: String,
70 pub state_seeds: Vec<(String, Vec<u8>)>,
71 pub allow_state_read: bool,
72 pub allow_state_write: bool,
73 pub allow_state_delete: bool,
74 pub allow_secrets: bool,
75 pub allowed_secrets: HashSet<String>,
76 pub secrets: HashMap<String, String>,
77 pub wasi_preopens: Vec<WasiPreopen>,
78 pub config: Option<Value>,
79 pub allow_http: bool,
80 pub timeout_ms: u64,
81 pub max_memory_bytes: usize,
82}
83
84#[derive(Clone, Debug)]
85pub struct WasiPreopen {
86 pub host_path: PathBuf,
87 pub guest_path: String,
88 pub read_only: bool,
89}
90
91impl WasiPreopen {
92 pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
93 Self {
94 host_path: host_path.into(),
95 guest_path: guest_path.into(),
96 read_only: false,
97 }
98 }
99
100 pub fn read_only(mut self, value: bool) -> Self {
101 self.read_only = value;
102 self
103 }
104}
105
106pub struct TestHarness {
107 engine: Engine,
108 instance_pre: InstancePre<HostState>,
109 guest_indices: GuestIndices,
110 state_store: Arc<InMemoryStateStore>,
111 secrets_store: Arc<InMemorySecretsStore>,
112 state_scope: StateScope,
113 allow_state_read: bool,
114 allow_state_write: bool,
115 allow_state_delete: bool,
116 exec_ctx: node::ExecCtx,
117 wasi_preopens: Vec<WasiPreopen>,
118 config_json: Option<String>,
119 allow_http: bool,
120 timeout_ms: u64,
121 max_memory_bytes: usize,
122}
123
124pub struct InvokeOutcome {
125 pub output_json: String,
126 pub instantiate_ms: u64,
127 pub run_ms: u64,
128}
129
130impl TestHarness {
131 pub fn new(config: HarnessConfig) -> Result<Self> {
132 let mut wasmtime_config = Config::new();
133 wasmtime_config.wasm_component_model(true);
134 wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
135 wasmtime_config.epoch_interruption(true);
136 let engine = Engine::new(&wasmtime_config).context("create wasmtime engine")?;
137
138 let component =
139 Component::from_binary(&engine, &config.wasm_bytes).context("load component wasm")?;
140
141 let linker = build_linker(&engine)?;
142 let instance_pre = linker
143 .instantiate_pre(&component)
144 .context("prepare component instance")?;
145 let guest_indices = GuestIndices::new(&instance_pre).context("load guest indices")?;
146
147 let state_store = Arc::new(InMemoryStateStore::new());
148 let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
149 let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
150 let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
151 for (key, value) in config.state_seeds {
152 state_store.write(&scope, &key, value);
153 }
154
155 let exec_ctx = node::ExecCtx {
156 tenant: make_component_tenant_ctx(&config.tenant_ctx),
157 flow_id: config.flow_id,
158 node_id: config.node_id,
159 };
160
161 let config_json = match config.config {
162 Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
163 None => None,
164 };
165
166 Ok(Self {
167 engine,
168 instance_pre,
169 guest_indices,
170 state_store,
171 secrets_store,
172 state_scope: scope,
173 allow_state_read: config.allow_state_read,
174 allow_state_write: config.allow_state_write,
175 allow_state_delete: config.allow_state_delete,
176 exec_ctx,
177 wasi_preopens: config.wasi_preopens,
178 config_json,
179 allow_http: config.allow_http,
180 timeout_ms: config.timeout_ms,
181 max_memory_bytes: config.max_memory_bytes,
182 })
183 }
184
185 pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
186 let host_state = HostState::new(HostStateConfig {
187 base_scope: self.state_scope.clone(),
188 state_store: self.state_store.clone(),
189 secrets: self.secrets_store.clone(),
190 allow_state_read: self.allow_state_read,
191 allow_state_write: self.allow_state_write,
192 allow_state_delete: self.allow_state_delete,
193 wasi_preopens: self.wasi_preopens.clone(),
194 allow_http: self.allow_http,
195 config_json: self.config_json.clone(),
196 max_memory_bytes: self.max_memory_bytes,
197 })
198 .context("build WASI context")?;
199 let mut store = Store::new(&self.engine, host_state);
200 store.limiter(|state| state.limits_mut());
201 store.set_epoch_deadline(1);
202
203 let done = Arc::new(AtomicBool::new(false));
204 let _timeout_guard = TimeoutGuard::new(done.clone());
205 let engine = self.engine.clone();
206 let timeout_ms = self.timeout_ms;
207 std::thread::spawn(move || {
208 std::thread::sleep(Duration::from_millis(timeout_ms));
209 if !done.load(Ordering::Relaxed) {
210 engine.increment_epoch();
211 }
212 });
213
214 let instantiate_start = Instant::now();
215 let instance = self
216 .instance_pre
217 .instantiate(&mut store)
218 .context("instantiate component")
219 .and_then(|instance| {
220 self.guest_indices
221 .load(&mut store, &instance)
222 .context("load component exports")
223 .map(|exports| (instance, exports))
224 });
225
226 let (_instance, exports) = match instance {
227 Ok(value) => value,
228 Err(err) => {
229 if is_timeout_error(&err) {
230 return Err(anyhow::Error::new(HarnessError::Timeout {
231 timeout_ms: self.timeout_ms,
232 }));
233 }
234 if store.data().memory_limit_hit() {
235 return Err(anyhow::Error::new(HarnessError::MemoryLimit {
236 max_memory_bytes: self.max_memory_bytes,
237 }));
238 }
239 return Err(err);
240 }
241 };
242 let instantiate_ms = duration_ms(instantiate_start.elapsed());
243
244 let input = serde_json::to_string(input_json).context("serialize input json")?;
245 let run_start = Instant::now();
246 let result = exports
247 .call_invoke(&mut store, &self.exec_ctx, operation, &input)
248 .context("invoke component");
249
250 use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::InvokeResult;
251
252 let result = match result {
253 Ok(result) => result,
254 Err(err) => {
255 if is_timeout_error(&err) {
256 return Err(anyhow::Error::new(HarnessError::Timeout {
257 timeout_ms: self.timeout_ms,
258 }));
259 }
260 if store.data().memory_limit_hit() {
261 return Err(anyhow::Error::new(HarnessError::MemoryLimit {
262 max_memory_bytes: self.max_memory_bytes,
263 }));
264 }
265 return Err(err);
266 }
267 };
268 let run_ms = duration_ms(run_start.elapsed());
269
270 match result {
271 InvokeResult::Ok(output_json) => Ok(InvokeOutcome {
272 output_json,
273 instantiate_ms,
274 run_ms,
275 }),
276 InvokeResult::Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
277 code: err.code,
278 message: err.message,
279 retryable: err.retryable,
280 backoff_ms: err.backoff_ms,
281 details: err.details,
282 })),
283 }
284 }
285
286 pub fn state_dump(&self) -> Vec<StateDumpEntry> {
287 self.state_store.dump()
288 }
289}
290
291fn make_component_tenant_ctx(tenant: &TenantCtx) -> node::TenantCtx {
292 node::TenantCtx {
293 tenant: tenant.tenant.as_str().to_string(),
294 team: tenant.team.as_ref().map(|t| t.as_str().to_string()),
295 user: tenant.user.as_ref().map(|u| u.as_str().to_string()),
296 trace_id: tenant.trace_id.clone(),
297 correlation_id: tenant.correlation_id.clone(),
298 deadline_unix_ms: tenant.deadline.and_then(|deadline| {
299 let millis = deadline.unix_millis();
300 if millis >= 0 {
301 u64::try_from(millis).ok()
302 } else {
303 None
304 }
305 }),
306 attempt: tenant.attempt,
307 idempotency_key: tenant.idempotency_key.clone(),
308 }
309}
310
311struct TimeoutGuard {
312 done: Arc<AtomicBool>,
313}
314
315impl TimeoutGuard {
316 fn new(done: Arc<AtomicBool>) -> Self {
317 Self { done }
318 }
319}
320
321impl Drop for TimeoutGuard {
322 fn drop(&mut self) {
323 self.done.store(true, Ordering::Relaxed);
324 }
325}
326
327fn is_timeout_error(err: &anyhow::Error) -> bool {
328 err.chain()
329 .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
330 .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
331}
332
333fn duration_ms(duration: Duration) -> u64 {
334 duration.as_millis().try_into().unwrap_or(u64::MAX)
335}