Skip to main content

greentic_component/test_harness/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use blake3::Hasher;
9use greentic_interfaces_host::component_v0_6;
10use greentic_types::TenantCtx;
11use greentic_types::cbor::canonical;
12use serde_json::Value;
13use wasmtime::component::{Component, Linker};
14use wasmtime::{Config, Engine, Store};
15
16use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
17use crate::test_harness::secrets::InMemorySecretsStore;
18use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
19
20mod linker;
21mod secrets;
22mod state;
23
24#[derive(Debug)]
25pub struct ComponentInvokeError {
26    pub code: String,
27    pub message: String,
28    pub retryable: bool,
29    pub backoff_ms: Option<u64>,
30    pub details: Option<String>,
31}
32
33impl std::fmt::Display for ComponentInvokeError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "component error {}: {}", self.code, self.message)
36    }
37}
38
39impl std::error::Error for ComponentInvokeError {}
40
41#[derive(Debug)]
42pub enum HarnessError {
43    Timeout { timeout_ms: u64 },
44    MemoryLimit { max_memory_bytes: usize },
45}
46
47impl std::fmt::Display for HarnessError {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            HarnessError::Timeout { timeout_ms } => {
51                write!(f, "execution exceeded timeout of {timeout_ms}ms")
52            }
53            HarnessError::MemoryLimit { max_memory_bytes } => {
54                write!(
55                    f,
56                    "execution exceeded memory limit of {max_memory_bytes} bytes"
57                )
58            }
59        }
60    }
61}
62
63impl std::error::Error for HarnessError {}
64
65pub struct HarnessConfig {
66    pub wasm_bytes: Vec<u8>,
67    pub tenant_ctx: TenantCtx,
68    pub flow_id: String,
69    pub node_id: Option<String>,
70    pub state_prefix: String,
71    pub state_seeds: Vec<(String, Vec<u8>)>,
72    pub allow_state_read: bool,
73    pub allow_state_write: bool,
74    pub allow_state_delete: bool,
75    pub allow_secrets: bool,
76    pub allowed_secrets: HashSet<String>,
77    pub secrets: HashMap<String, String>,
78    pub wasi_preopens: Vec<WasiPreopen>,
79    pub config: Option<Value>,
80    pub allow_http: bool,
81    pub timeout_ms: u64,
82    pub max_memory_bytes: usize,
83}
84
85#[derive(Clone, Debug)]
86pub struct WasiPreopen {
87    pub host_path: PathBuf,
88    pub guest_path: String,
89    pub read_only: bool,
90}
91
92impl WasiPreopen {
93    pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
94        Self {
95            host_path: host_path.into(),
96            guest_path: guest_path.into(),
97            read_only: false,
98        }
99    }
100
101    pub fn read_only(mut self, value: bool) -> Self {
102        self.read_only = value;
103        self
104    }
105}
106
107pub struct TestHarness {
108    engine: Engine,
109    component: Component,
110    linker: Linker<HostState>,
111    state_store: Arc<InMemoryStateStore>,
112    secrets_store: Arc<InMemorySecretsStore>,
113    state_scope: StateScope,
114    allow_state_read: bool,
115    allow_state_write: bool,
116    allow_state_delete: bool,
117    tenant_ctx: TenantCtx,
118    flow_id: String,
119    node_id: Option<String>,
120    wasi_preopens: Vec<WasiPreopen>,
121    config_json: Option<String>,
122    allow_http: bool,
123    timeout_ms: u64,
124    max_memory_bytes: usize,
125    wasm_bytes_metadata: String,
126}
127
128pub struct InvokeOutcome {
129    pub output_json: String,
130    pub instantiate_ms: u64,
131    pub run_ms: u64,
132}
133
134impl TestHarness {
135    pub fn new(config: HarnessConfig) -> Result<Self> {
136        let mut wasmtime_config = Config::new();
137        wasmtime_config.wasm_component_model(true);
138        wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
139        wasmtime_config.epoch_interruption(true);
140        let engine = Engine::new(&wasmtime_config)
141            .map_err(|err| anyhow::anyhow!("create wasmtime engine: {err}"))?;
142
143        let component = Component::from_binary(&engine, &config.wasm_bytes)
144            .map_err(|err| anyhow::anyhow!("load component wasm: {err}"))?;
145        let wasm_bytes_metadata = describe_wasm_metadata(&config.wasm_bytes);
146
147        let linker = build_linker(&engine)?;
148        linker.instantiate_pre(&component).map_err(|err| {
149            anyhow::anyhow!(
150                "prepare component instance (wasm metadata: {}): {}",
151                wasm_bytes_metadata,
152                err
153            )
154        })?;
155
156        let state_store = Arc::new(InMemoryStateStore::new());
157        let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
158        let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
159        let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
160        for (key, value) in config.state_seeds {
161            state_store.write(&scope, &key, value);
162        }
163
164        let config_json = match config.config {
165            Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
166            None => None,
167        };
168
169        Ok(Self {
170            engine,
171            component,
172            linker,
173            state_store,
174            secrets_store,
175            state_scope: scope,
176            allow_state_read: config.allow_state_read,
177            allow_state_write: config.allow_state_write,
178            allow_state_delete: config.allow_state_delete,
179            tenant_ctx: config.tenant_ctx,
180            flow_id: config.flow_id,
181            node_id: config.node_id,
182            wasi_preopens: config.wasi_preopens,
183            config_json,
184            allow_http: config.allow_http,
185            timeout_ms: config.timeout_ms,
186            max_memory_bytes: config.max_memory_bytes,
187            wasm_bytes_metadata,
188        })
189    }
190
191    pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
192        let host_state = HostState::new(HostStateConfig {
193            base_scope: self.state_scope.clone(),
194            state_store: self.state_store.clone(),
195            secrets: self.secrets_store.clone(),
196            allow_state_read: self.allow_state_read,
197            allow_state_write: self.allow_state_write,
198            allow_state_delete: self.allow_state_delete,
199            wasi_preopens: self.wasi_preopens.clone(),
200            allow_http: self.allow_http,
201            config_json: self.config_json.clone(),
202            max_memory_bytes: self.max_memory_bytes,
203        })
204        .context("build WASI context")?;
205        let mut store = Store::new(&self.engine, host_state);
206        store.limiter(|state| state.limits_mut());
207        store.set_epoch_deadline(1);
208
209        let done = Arc::new(AtomicBool::new(false));
210        let _timeout_guard = TimeoutGuard::new(done.clone());
211        let engine = self.engine.clone();
212        let timeout_ms = self.timeout_ms;
213        std::thread::spawn(move || {
214            std::thread::sleep(Duration::from_millis(timeout_ms));
215            if !done.load(Ordering::Relaxed) {
216                engine.increment_epoch();
217            }
218        });
219
220        let instantiate_start = Instant::now();
221        let exports =
222            component_v0_6::ComponentV0V6V0::instantiate(&mut store, &self.component, &self.linker)
223                .map_err(|err| anyhow::anyhow!("instantiate component: {err}"))
224                .with_context(|| {
225                    format!(
226                        "failed to prepare component instance (wasm metadata: {})",
227                        self.wasm_bytes_metadata
228                    )
229                });
230        let exports = match exports {
231            Ok(value) => value,
232            Err(err) => {
233                return map_invoke_error(err, &store, self.timeout_ms, self.max_memory_bytes);
234            }
235        };
236        let instantiate_ms = duration_ms(instantiate_start.elapsed());
237
238        let mut payload = input_json.clone();
239        if !payload.is_object() {
240            payload = serde_json::json!({ "input": payload });
241        }
242        if let Some(object) = payload.as_object_mut()
243            && !object.contains_key("operation")
244        {
245            object.insert(
246                "operation".to_string(),
247                Value::String(operation.to_string()),
248            );
249        }
250
251        let input = canonical::to_canonical_cbor_allow_floats(&payload)
252            .context("encode invoke payload to cbor")?;
253        let invoke_envelope =
254            component_v0_6::exports::greentic::component::node::InvocationEnvelope {
255                ctx: make_component_tenant_ctx_v0_6(&self.tenant_ctx),
256                flow_id: self.flow_id.clone(),
257                step_id: self
258                    .node_id
259                    .clone()
260                    .unwrap_or_else(|| operation.to_string()),
261                component_id: self
262                    .node_id
263                    .clone()
264                    .unwrap_or_else(|| "component".to_string()),
265                attempt: self.tenant_ctx.attempt,
266                payload_cbor: input,
267                metadata_cbor: None,
268            };
269
270        let run_start = Instant::now();
271        let result = exports
272            .greentic_component_node()
273            .call_invoke(&mut store, operation, &invoke_envelope)
274            .map_err(|err| anyhow::anyhow!("invoke component: {err}"));
275        let result = match result {
276            Ok(value) => value,
277            Err(err) => {
278                return map_invoke_error(err, &store, self.timeout_ms, self.max_memory_bytes);
279            }
280        };
281        let run_ms = duration_ms(run_start.elapsed());
282        match result {
283            Ok(result) => {
284                let output_value: Value = canonical::from_cbor(&result.output_cbor)
285                    .context("decode invoke output cbor")?;
286                let output_json =
287                    serde_json::to_string(&output_value).context("serialize invoke output json")?;
288                Ok(InvokeOutcome {
289                    output_json,
290                    instantiate_ms,
291                    run_ms,
292                })
293            }
294            Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
295                code: err.code,
296                message: err.message,
297                retryable: err.retryable,
298                backoff_ms: err.backoff_ms,
299                details: err
300                    .details
301                    .as_ref()
302                    .and_then(|bytes| canonical::from_cbor::<Value>(bytes).ok())
303                    .and_then(|value| serde_json::to_string(&value).ok()),
304            })),
305        }
306    }
307
308    pub fn state_dump(&self) -> Vec<StateDumpEntry> {
309        self.state_store.dump()
310    }
311}
312
313fn make_component_tenant_ctx_v0_6(
314    tenant: &TenantCtx,
315) -> component_v0_6::exports::greentic::component::node::TenantCtx {
316    component_v0_6::exports::greentic::component::node::TenantCtx {
317        tenant_id: tenant.tenant_id.as_str().to_string(),
318        team_id: tenant.team_id.as_ref().map(|t| t.as_str().to_string()),
319        user_id: tenant.user_id.as_ref().map(|u| u.as_str().to_string()),
320        env_id: tenant.env.as_str().to_string(),
321        trace_id: tenant
322            .trace_id
323            .clone()
324            .unwrap_or_else(|| "trace-local".to_string()),
325        correlation_id: tenant
326            .correlation_id
327            .clone()
328            .unwrap_or_else(|| "corr-local".to_string()),
329        deadline_ms: tenant
330            .deadline
331            .and_then(|deadline| u64::try_from(deadline.unix_millis()).ok())
332            .unwrap_or(0),
333        attempt: tenant.attempt,
334        idempotency_key: tenant.idempotency_key.clone(),
335        i18n_id: tenant.i18n_id.clone().unwrap_or_else(|| "en".to_string()),
336    }
337}
338
339struct TimeoutGuard {
340    done: Arc<AtomicBool>,
341}
342
343impl TimeoutGuard {
344    fn new(done: Arc<AtomicBool>) -> Self {
345        Self { done }
346    }
347}
348
349impl Drop for TimeoutGuard {
350    fn drop(&mut self) {
351        self.done.store(true, Ordering::Relaxed);
352    }
353}
354
355fn is_timeout_error(err: &anyhow::Error) -> bool {
356    err.chain()
357        .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
358        .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
359}
360
361fn duration_ms(duration: Duration) -> u64 {
362    duration.as_millis().try_into().unwrap_or(u64::MAX)
363}
364
365fn map_invoke_error(
366    err: anyhow::Error,
367    store: &Store<HostState>,
368    timeout_ms: u64,
369    max_memory_bytes: usize,
370) -> Result<InvokeOutcome> {
371    if is_timeout_error(&err) {
372        return Err(anyhow::Error::new(HarnessError::Timeout { timeout_ms }));
373    }
374    if store.data().memory_limit_hit() {
375        return Err(anyhow::Error::new(HarnessError::MemoryLimit {
376            max_memory_bytes,
377        }));
378    }
379    Err(err)
380}
381
382fn describe_wasm_metadata(bytes: &[u8]) -> String {
383    let mut hasher = Hasher::new();
384    hasher.update(bytes);
385    format!("len={}, blake3:{}", bytes.len(), hasher.finalize().to_hex())
386}