Skip to main content

greentic_component/test_harness/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use blake3::Hasher;
9use greentic_interfaces_host::component::v0_5::exports::greentic::component::node;
10use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::GuestIndices;
11use greentic_interfaces_host::component_v0_6;
12use greentic_types::TenantCtx;
13use greentic_types::cbor::canonical;
14use serde_json::Value;
15use wasmtime::component::{Component, InstancePre, Linker};
16use wasmtime::{Config, Engine, Store};
17
18use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
19use crate::test_harness::secrets::InMemorySecretsStore;
20use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
21
22mod linker;
23mod secrets;
24mod state;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27enum ComponentAbi {
28    V0_5,
29    V0_6,
30}
31
32#[derive(Debug)]
33pub struct ComponentInvokeError {
34    pub code: String,
35    pub message: String,
36    pub retryable: bool,
37    pub backoff_ms: Option<u64>,
38    pub details: Option<String>,
39}
40
41impl std::fmt::Display for ComponentInvokeError {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        write!(f, "component error {}: {}", self.code, self.message)
44    }
45}
46
47impl std::error::Error for ComponentInvokeError {}
48
49#[derive(Debug)]
50pub enum HarnessError {
51    Timeout { timeout_ms: u64 },
52    MemoryLimit { max_memory_bytes: usize },
53}
54
55impl std::fmt::Display for HarnessError {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        match self {
58            HarnessError::Timeout { timeout_ms } => {
59                write!(f, "execution exceeded timeout of {timeout_ms}ms")
60            }
61            HarnessError::MemoryLimit { max_memory_bytes } => {
62                write!(
63                    f,
64                    "execution exceeded memory limit of {max_memory_bytes} bytes"
65                )
66            }
67        }
68    }
69}
70
71impl std::error::Error for HarnessError {}
72
73pub struct HarnessConfig {
74    pub wasm_bytes: Vec<u8>,
75    pub tenant_ctx: TenantCtx,
76    pub flow_id: String,
77    pub node_id: Option<String>,
78    pub state_prefix: String,
79    pub state_seeds: Vec<(String, Vec<u8>)>,
80    pub allow_state_read: bool,
81    pub allow_state_write: bool,
82    pub allow_state_delete: bool,
83    pub allow_secrets: bool,
84    pub allowed_secrets: HashSet<String>,
85    pub secrets: HashMap<String, String>,
86    pub wasi_preopens: Vec<WasiPreopen>,
87    pub config: Option<Value>,
88    pub allow_http: bool,
89    pub timeout_ms: u64,
90    pub max_memory_bytes: usize,
91}
92
93#[derive(Clone, Debug)]
94pub struct WasiPreopen {
95    pub host_path: PathBuf,
96    pub guest_path: String,
97    pub read_only: bool,
98}
99
100impl WasiPreopen {
101    pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
102        Self {
103            host_path: host_path.into(),
104            guest_path: guest_path.into(),
105            read_only: false,
106        }
107    }
108
109    pub fn read_only(mut self, value: bool) -> Self {
110        self.read_only = value;
111        self
112    }
113}
114
115pub struct TestHarness {
116    engine: Engine,
117    component: Component,
118    linker: Linker<HostState>,
119    instance_pre: InstancePre<HostState>,
120    guest_indices: Option<GuestIndices>,
121    abi: ComponentAbi,
122    state_store: Arc<InMemoryStateStore>,
123    secrets_store: Arc<InMemorySecretsStore>,
124    state_scope: StateScope,
125    allow_state_read: bool,
126    allow_state_write: bool,
127    allow_state_delete: bool,
128    tenant_ctx: TenantCtx,
129    exec_ctx: node::ExecCtx,
130    wasi_preopens: Vec<WasiPreopen>,
131    config_json: Option<String>,
132    allow_http: bool,
133    timeout_ms: u64,
134    max_memory_bytes: usize,
135    wasm_bytes_metadata: String,
136}
137
138pub struct InvokeOutcome {
139    pub output_json: String,
140    pub instantiate_ms: u64,
141    pub run_ms: u64,
142}
143
144impl TestHarness {
145    pub fn new(config: HarnessConfig) -> Result<Self> {
146        let mut wasmtime_config = Config::new();
147        wasmtime_config.wasm_component_model(true);
148        wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
149        wasmtime_config.epoch_interruption(true);
150        let engine = Engine::new(&wasmtime_config)
151            .map_err(|err| anyhow::anyhow!("create wasmtime engine: {err}"))?;
152
153        let component = Component::from_binary(&engine, &config.wasm_bytes)
154            .map_err(|err| anyhow::anyhow!("load component wasm: {err}"))?;
155        let wasm_bytes_metadata = describe_wasm_metadata(&config.wasm_bytes);
156        let abi = detect_component_abi(&config.wasm_bytes);
157
158        let linker = build_linker(&engine)?;
159        let instance_pre = linker.instantiate_pre(&component).map_err(|err| {
160            anyhow::anyhow!(
161                "prepare component instance (wasm metadata: {}): {}",
162                wasm_bytes_metadata,
163                err
164            )
165        })?;
166        let guest_indices = if abi == ComponentAbi::V0_5 {
167            Some(GuestIndices::new(&instance_pre).map_err(|err| {
168                anyhow::anyhow!(
169                    "load guest indices (wasm metadata: {}): {}",
170                    wasm_bytes_metadata,
171                    err
172                )
173            })?)
174        } else {
175            None
176        };
177
178        let state_store = Arc::new(InMemoryStateStore::new());
179        let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
180        let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
181        let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
182        for (key, value) in config.state_seeds {
183            state_store.write(&scope, &key, value);
184        }
185
186        let exec_ctx = node::ExecCtx {
187            tenant: make_component_tenant_ctx(&config.tenant_ctx),
188            i18n_id: config.tenant_ctx.i18n_id.clone(),
189            flow_id: config.flow_id,
190            node_id: config.node_id,
191        };
192
193        let config_json = match config.config {
194            Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
195            None => None,
196        };
197
198        Ok(Self {
199            engine,
200            component,
201            linker,
202            instance_pre,
203            guest_indices,
204            abi,
205            state_store,
206            secrets_store,
207            state_scope: scope,
208            allow_state_read: config.allow_state_read,
209            allow_state_write: config.allow_state_write,
210            allow_state_delete: config.allow_state_delete,
211            tenant_ctx: config.tenant_ctx,
212            exec_ctx,
213            wasi_preopens: config.wasi_preopens,
214            config_json,
215            allow_http: config.allow_http,
216            timeout_ms: config.timeout_ms,
217            max_memory_bytes: config.max_memory_bytes,
218            wasm_bytes_metadata,
219        })
220    }
221
222    pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
223        let host_state = HostState::new(HostStateConfig {
224            base_scope: self.state_scope.clone(),
225            state_store: self.state_store.clone(),
226            secrets: self.secrets_store.clone(),
227            allow_state_read: self.allow_state_read,
228            allow_state_write: self.allow_state_write,
229            allow_state_delete: self.allow_state_delete,
230            wasi_preopens: self.wasi_preopens.clone(),
231            allow_http: self.allow_http,
232            config_json: self.config_json.clone(),
233            max_memory_bytes: self.max_memory_bytes,
234        })
235        .context("build WASI context")?;
236        let mut store = Store::new(&self.engine, host_state);
237        store.limiter(|state| state.limits_mut());
238        store.set_epoch_deadline(1);
239
240        let done = Arc::new(AtomicBool::new(false));
241        let _timeout_guard = TimeoutGuard::new(done.clone());
242        let engine = self.engine.clone();
243        let timeout_ms = self.timeout_ms;
244        std::thread::spawn(move || {
245            std::thread::sleep(Duration::from_millis(timeout_ms));
246            if !done.load(Ordering::Relaxed) {
247                engine.increment_epoch();
248            }
249        });
250
251        let instantiate_start = Instant::now();
252        match self.abi {
253            ComponentAbi::V0_5 => {
254                let guest_indices = self
255                    .guest_indices
256                    .as_ref()
257                    .context("missing v0.5 guest indices")?;
258                let instance = self
259                    .instance_pre
260                    .instantiate(&mut store)
261                    .map_err(|err| anyhow::anyhow!("instantiate component: {err}"))
262                    .and_then(|instance| {
263                        guest_indices
264                            .load(&mut store, &instance)
265                            .map_err(|err| anyhow::anyhow!("load component exports: {err}"))
266                            .map(|exports| (instance, exports))
267                    })
268                    .with_context(|| {
269                        format!(
270                            "failed to prepare component instance (wasm metadata: {})",
271                            self.wasm_bytes_metadata
272                        )
273                    });
274
275                let (_instance, exports) = match instance {
276                    Ok(value) => value,
277                    Err(err) => {
278                        return map_invoke_error(
279                            err,
280                            &store,
281                            self.timeout_ms,
282                            self.max_memory_bytes,
283                        );
284                    }
285                };
286                let instantiate_ms = duration_ms(instantiate_start.elapsed());
287
288                let input = serde_json::to_string(input_json).context("serialize input json")?;
289                let run_start = Instant::now();
290                let result = exports
291                    .call_invoke(&mut store, &self.exec_ctx, operation, &input)
292                    .map_err(|err| anyhow::anyhow!("invoke component: {err}"));
293
294                use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::InvokeResult;
295
296                let result = match result {
297                    Ok(result) => result,
298                    Err(err) => {
299                        return map_invoke_error(
300                            err,
301                            &store,
302                            self.timeout_ms,
303                            self.max_memory_bytes,
304                        );
305                    }
306                };
307                let run_ms = duration_ms(run_start.elapsed());
308
309                match result {
310                    InvokeResult::Ok(output_json) => Ok(InvokeOutcome {
311                        output_json,
312                        instantiate_ms,
313                        run_ms,
314                    }),
315                    InvokeResult::Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
316                        code: err.code,
317                        message: err.message,
318                        retryable: err.retryable,
319                        backoff_ms: err.backoff_ms,
320                        details: err.details,
321                    })),
322                }
323            }
324            ComponentAbi::V0_6 => {
325                let exports = component_v0_6::ComponentV0V6V0::instantiate(
326                    &mut store,
327                    &self.component,
328                    &self.linker,
329                )
330                .map_err(|err| anyhow::anyhow!("instantiate component: {err}"))
331                .with_context(|| {
332                    format!(
333                        "failed to prepare component instance (wasm metadata: {})",
334                        self.wasm_bytes_metadata
335                    )
336                });
337                let exports = match exports {
338                    Ok(value) => value,
339                    Err(err) => {
340                        return map_invoke_error(
341                            err,
342                            &store,
343                            self.timeout_ms,
344                            self.max_memory_bytes,
345                        );
346                    }
347                };
348                let instantiate_ms = duration_ms(instantiate_start.elapsed());
349
350                let mut payload = input_json.clone();
351                if !payload.is_object() {
352                    payload = serde_json::json!({ "input": payload });
353                }
354                if let Some(object) = payload.as_object_mut()
355                    && !object.contains_key("operation")
356                {
357                    object.insert(
358                        "operation".to_string(),
359                        Value::String(operation.to_string()),
360                    );
361                }
362
363                let input = canonical::to_canonical_cbor_allow_floats(&payload)
364                    .context("encode invoke payload to cbor")?;
365                let invoke_envelope =
366                    component_v0_6::exports::greentic::component::node::InvocationEnvelope {
367                        ctx: make_component_tenant_ctx_v0_6(&self.tenant_ctx),
368                        flow_id: self.exec_ctx.flow_id.clone(),
369                        step_id: self
370                            .exec_ctx
371                            .node_id
372                            .clone()
373                            .unwrap_or_else(|| operation.to_string()),
374                        component_id: self
375                            .exec_ctx
376                            .node_id
377                            .clone()
378                            .unwrap_or_else(|| "component".to_string()),
379                        attempt: self.tenant_ctx.attempt,
380                        payload_cbor: input,
381                        metadata_cbor: None,
382                    };
383
384                let run_start = Instant::now();
385                let result = exports
386                    .greentic_component_node()
387                    .call_invoke(&mut store, operation, &invoke_envelope)
388                    .map_err(|err| anyhow::anyhow!("invoke component: {err}"));
389                let result = match result {
390                    Ok(value) => value,
391                    Err(err) => {
392                        return map_invoke_error(
393                            err,
394                            &store,
395                            self.timeout_ms,
396                            self.max_memory_bytes,
397                        );
398                    }
399                };
400                let run_ms = duration_ms(run_start.elapsed());
401                match result {
402                    Ok(result) => {
403                        let output_value: Value = canonical::from_cbor(&result.output_cbor)
404                            .context("decode invoke output cbor")?;
405                        let output_json = serde_json::to_string(&output_value)
406                            .context("serialize invoke output json")?;
407                        Ok(InvokeOutcome {
408                            output_json,
409                            instantiate_ms,
410                            run_ms,
411                        })
412                    }
413                    Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
414                        code: err.code,
415                        message: err.message,
416                        retryable: err.retryable,
417                        backoff_ms: err.backoff_ms,
418                        details: err
419                            .details
420                            .as_ref()
421                            .and_then(|bytes| canonical::from_cbor::<Value>(bytes).ok())
422                            .and_then(|value| serde_json::to_string(&value).ok()),
423                    })),
424                }
425            }
426        }
427    }
428
429    pub fn state_dump(&self) -> Vec<StateDumpEntry> {
430        self.state_store.dump()
431    }
432}
433
434fn make_component_tenant_ctx(tenant: &TenantCtx) -> node::TenantCtx {
435    node::TenantCtx {
436        tenant: tenant.tenant.as_str().to_string(),
437        team: tenant.team.as_ref().map(|t| t.as_str().to_string()),
438        user: tenant.user.as_ref().map(|u| u.as_str().to_string()),
439        trace_id: tenant.trace_id.clone(),
440        i18n_id: tenant.i18n_id.clone(),
441        correlation_id: tenant.correlation_id.clone(),
442        deadline_unix_ms: tenant
443            .deadline
444            .and_then(|deadline| u64::try_from(deadline.unix_millis()).ok()),
445        attempt: tenant.attempt,
446        idempotency_key: tenant.idempotency_key.clone(),
447    }
448}
449
450fn make_component_tenant_ctx_v0_6(
451    tenant: &TenantCtx,
452) -> component_v0_6::exports::greentic::component::node::TenantCtx {
453    component_v0_6::exports::greentic::component::node::TenantCtx {
454        tenant_id: tenant.tenant_id.as_str().to_string(),
455        team_id: tenant.team_id.as_ref().map(|t| t.as_str().to_string()),
456        user_id: tenant.user_id.as_ref().map(|u| u.as_str().to_string()),
457        env_id: tenant.env.as_str().to_string(),
458        trace_id: tenant
459            .trace_id
460            .clone()
461            .unwrap_or_else(|| "trace-local".to_string()),
462        correlation_id: tenant
463            .correlation_id
464            .clone()
465            .unwrap_or_else(|| "corr-local".to_string()),
466        deadline_ms: tenant
467            .deadline
468            .and_then(|deadline| u64::try_from(deadline.unix_millis()).ok())
469            .unwrap_or(0),
470        attempt: tenant.attempt,
471        idempotency_key: tenant.idempotency_key.clone(),
472        i18n_id: tenant.i18n_id.clone().unwrap_or_else(|| "en".to_string()),
473    }
474}
475
476struct TimeoutGuard {
477    done: Arc<AtomicBool>,
478}
479
480impl TimeoutGuard {
481    fn new(done: Arc<AtomicBool>) -> Self {
482        Self { done }
483    }
484}
485
486impl Drop for TimeoutGuard {
487    fn drop(&mut self) {
488        self.done.store(true, Ordering::Relaxed);
489    }
490}
491
492fn is_timeout_error(err: &anyhow::Error) -> bool {
493    err.chain()
494        .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
495        .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
496}
497
498fn duration_ms(duration: Duration) -> u64 {
499    duration.as_millis().try_into().unwrap_or(u64::MAX)
500}
501
502fn map_invoke_error(
503    err: anyhow::Error,
504    store: &Store<HostState>,
505    timeout_ms: u64,
506    max_memory_bytes: usize,
507) -> Result<InvokeOutcome> {
508    if is_timeout_error(&err) {
509        return Err(anyhow::Error::new(HarnessError::Timeout { timeout_ms }));
510    }
511    if store.data().memory_limit_hit() {
512        return Err(anyhow::Error::new(HarnessError::MemoryLimit {
513            max_memory_bytes,
514        }));
515    }
516    Err(err)
517}
518
519fn detect_component_abi(bytes: &[u8]) -> ComponentAbi {
520    if let Ok(decoded) = crate::wasm::decode_world(bytes) {
521        let world = &decoded.resolve.worlds[decoded.world];
522        if world.name == "component-v0-v5-v0" {
523            return ComponentAbi::V0_5;
524        }
525    }
526    ComponentAbi::V0_6
527}
528
529fn describe_wasm_metadata(bytes: &[u8]) -> String {
530    let mut hasher = Hasher::new();
531    hasher.update(bytes);
532    format!("len={}, blake3:{}", bytes.len(), hasher.finalize().to_hex())
533}