Skip to main content

greentic_component/test_harness/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use blake3::Hasher;
9use greentic_interfaces_host::component::v0_5::exports::greentic::component::node;
10use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::GuestIndices;
11use greentic_types::TenantCtx;
12use serde_json::Value;
13use wasmtime::component::{Component, InstancePre};
14use wasmtime::{Config, Engine, Store};
15
16use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
17use crate::test_harness::secrets::InMemorySecretsStore;
18use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
19
20mod linker;
21mod secrets;
22mod state;
23
24#[derive(Debug)]
25pub struct ComponentInvokeError {
26    pub code: String,
27    pub message: String,
28    pub retryable: bool,
29    pub backoff_ms: Option<u64>,
30    pub details: Option<String>,
31}
32
33impl std::fmt::Display for ComponentInvokeError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "component error {}: {}", self.code, self.message)
36    }
37}
38
39impl std::error::Error for ComponentInvokeError {}
40
41#[derive(Debug)]
42pub enum HarnessError {
43    Timeout { timeout_ms: u64 },
44    MemoryLimit { max_memory_bytes: usize },
45}
46
47impl std::fmt::Display for HarnessError {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            HarnessError::Timeout { timeout_ms } => {
51                write!(f, "execution exceeded timeout of {timeout_ms}ms")
52            }
53            HarnessError::MemoryLimit { max_memory_bytes } => {
54                write!(
55                    f,
56                    "execution exceeded memory limit of {max_memory_bytes} bytes"
57                )
58            }
59        }
60    }
61}
62
63impl std::error::Error for HarnessError {}
64
65pub struct HarnessConfig {
66    pub wasm_bytes: Vec<u8>,
67    pub tenant_ctx: TenantCtx,
68    pub flow_id: String,
69    pub node_id: Option<String>,
70    pub state_prefix: String,
71    pub state_seeds: Vec<(String, Vec<u8>)>,
72    pub allow_state_read: bool,
73    pub allow_state_write: bool,
74    pub allow_state_delete: bool,
75    pub allow_secrets: bool,
76    pub allowed_secrets: HashSet<String>,
77    pub secrets: HashMap<String, String>,
78    pub wasi_preopens: Vec<WasiPreopen>,
79    pub config: Option<Value>,
80    pub allow_http: bool,
81    pub timeout_ms: u64,
82    pub max_memory_bytes: usize,
83}
84
85#[derive(Clone, Debug)]
86pub struct WasiPreopen {
87    pub host_path: PathBuf,
88    pub guest_path: String,
89    pub read_only: bool,
90}
91
92impl WasiPreopen {
93    pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
94        Self {
95            host_path: host_path.into(),
96            guest_path: guest_path.into(),
97            read_only: false,
98        }
99    }
100
101    pub fn read_only(mut self, value: bool) -> Self {
102        self.read_only = value;
103        self
104    }
105}
106
107pub struct TestHarness {
108    engine: Engine,
109    instance_pre: InstancePre<HostState>,
110    guest_indices: GuestIndices,
111    state_store: Arc<InMemoryStateStore>,
112    secrets_store: Arc<InMemorySecretsStore>,
113    state_scope: StateScope,
114    allow_state_read: bool,
115    allow_state_write: bool,
116    allow_state_delete: bool,
117    exec_ctx: node::ExecCtx,
118    wasi_preopens: Vec<WasiPreopen>,
119    config_json: Option<String>,
120    allow_http: bool,
121    timeout_ms: u64,
122    max_memory_bytes: usize,
123    wasm_bytes_metadata: String,
124}
125
126pub struct InvokeOutcome {
127    pub output_json: String,
128    pub instantiate_ms: u64,
129    pub run_ms: u64,
130}
131
132impl TestHarness {
133    pub fn new(config: HarnessConfig) -> Result<Self> {
134        let mut wasmtime_config = Config::new();
135        wasmtime_config.wasm_component_model(true);
136        wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
137        wasmtime_config.epoch_interruption(true);
138        let engine = Engine::new(&wasmtime_config).context("create wasmtime engine")?;
139
140        let component =
141            Component::from_binary(&engine, &config.wasm_bytes).context("load component wasm")?;
142        let wasm_bytes_metadata = describe_wasm_metadata(&config.wasm_bytes);
143
144        let linker = build_linker(&engine)?;
145        let instance_pre = linker
146            .instantiate_pre(&component)
147            .map_err(|err| {
148                eprintln!(
149                    "Linker::instantiate_pre failed ({}): {err}",
150                    wasm_bytes_metadata
151                );
152                for source in err.chain().skip(1) {
153                    eprintln!("  cause: {source}");
154                }
155                err
156            })
157            .with_context(|| {
158                format!(
159                    "prepare component instance (wasm metadata: {})",
160                    wasm_bytes_metadata
161                )
162            })?;
163        let guest_indices = GuestIndices::new(&instance_pre)
164            .map_err(|err| {
165                eprintln!("GuestIndices::new failed ({}): {err}", wasm_bytes_metadata);
166                for source in err.chain().skip(1) {
167                    eprintln!("  cause: {source}");
168                }
169                err
170            })
171            .with_context(|| {
172                format!(
173                    "load guest indices (wasm metadata: {})",
174                    wasm_bytes_metadata
175                )
176            })?;
177
178        let state_store = Arc::new(InMemoryStateStore::new());
179        let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
180        let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
181        let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
182        for (key, value) in config.state_seeds {
183            state_store.write(&scope, &key, value);
184        }
185
186        let exec_ctx = node::ExecCtx {
187            tenant: make_component_tenant_ctx(&config.tenant_ctx),
188            i18n_id: config.tenant_ctx.i18n_id.clone(),
189            flow_id: config.flow_id,
190            node_id: config.node_id,
191        };
192
193        let config_json = match config.config {
194            Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
195            None => None,
196        };
197
198        Ok(Self {
199            engine,
200            instance_pre,
201            guest_indices,
202            state_store,
203            secrets_store,
204            state_scope: scope,
205            allow_state_read: config.allow_state_read,
206            allow_state_write: config.allow_state_write,
207            allow_state_delete: config.allow_state_delete,
208            exec_ctx,
209            wasi_preopens: config.wasi_preopens,
210            config_json,
211            allow_http: config.allow_http,
212            timeout_ms: config.timeout_ms,
213            max_memory_bytes: config.max_memory_bytes,
214            wasm_bytes_metadata,
215        })
216    }
217
218    pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
219        let host_state = HostState::new(HostStateConfig {
220            base_scope: self.state_scope.clone(),
221            state_store: self.state_store.clone(),
222            secrets: self.secrets_store.clone(),
223            allow_state_read: self.allow_state_read,
224            allow_state_write: self.allow_state_write,
225            allow_state_delete: self.allow_state_delete,
226            wasi_preopens: self.wasi_preopens.clone(),
227            allow_http: self.allow_http,
228            config_json: self.config_json.clone(),
229            max_memory_bytes: self.max_memory_bytes,
230        })
231        .context("build WASI context")?;
232        let mut store = Store::new(&self.engine, host_state);
233        store.limiter(|state| state.limits_mut());
234        store.set_epoch_deadline(1);
235
236        let done = Arc::new(AtomicBool::new(false));
237        let _timeout_guard = TimeoutGuard::new(done.clone());
238        let engine = self.engine.clone();
239        let timeout_ms = self.timeout_ms;
240        std::thread::spawn(move || {
241            std::thread::sleep(Duration::from_millis(timeout_ms));
242            if !done.load(Ordering::Relaxed) {
243                engine.increment_epoch();
244            }
245        });
246
247        let instantiate_start = Instant::now();
248        let instance = self
249            .instance_pre
250            .instantiate(&mut store)
251            .context("instantiate component")
252            .and_then(|instance| {
253                self.guest_indices
254                    .load(&mut store, &instance)
255                    .context("load component exports")
256                    .map(|exports| (instance, exports))
257            })
258            .with_context(|| {
259                format!(
260                    "failed to prepare component instance (wasm metadata: {})",
261                    self.wasm_bytes_metadata
262                )
263            });
264
265        let (_instance, exports) = match instance {
266            Ok(value) => value,
267            Err(err) => {
268                if is_timeout_error(&err) {
269                    return Err(anyhow::Error::new(HarnessError::Timeout {
270                        timeout_ms: self.timeout_ms,
271                    }));
272                }
273                if store.data().memory_limit_hit() {
274                    return Err(anyhow::Error::new(HarnessError::MemoryLimit {
275                        max_memory_bytes: self.max_memory_bytes,
276                    }));
277                }
278                return Err(err);
279            }
280        };
281        let instantiate_ms = duration_ms(instantiate_start.elapsed());
282
283        let input = serde_json::to_string(input_json).context("serialize input json")?;
284        let run_start = Instant::now();
285        let result = exports
286            .call_invoke(&mut store, &self.exec_ctx, operation, &input)
287            .context("invoke component");
288
289        use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::InvokeResult;
290
291        let result = match result {
292            Ok(result) => result,
293            Err(err) => {
294                if is_timeout_error(&err) {
295                    return Err(anyhow::Error::new(HarnessError::Timeout {
296                        timeout_ms: self.timeout_ms,
297                    }));
298                }
299                if store.data().memory_limit_hit() {
300                    return Err(anyhow::Error::new(HarnessError::MemoryLimit {
301                        max_memory_bytes: self.max_memory_bytes,
302                    }));
303                }
304                return Err(err);
305            }
306        };
307        let run_ms = duration_ms(run_start.elapsed());
308
309        match result {
310            InvokeResult::Ok(output_json) => Ok(InvokeOutcome {
311                output_json,
312                instantiate_ms,
313                run_ms,
314            }),
315            InvokeResult::Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
316                code: err.code,
317                message: err.message,
318                retryable: err.retryable,
319                backoff_ms: err.backoff_ms,
320                details: err.details,
321            })),
322        }
323    }
324
325    pub fn state_dump(&self) -> Vec<StateDumpEntry> {
326        self.state_store.dump()
327    }
328}
329
330fn make_component_tenant_ctx(tenant: &TenantCtx) -> node::TenantCtx {
331    node::TenantCtx {
332        tenant: tenant.tenant.as_str().to_string(),
333        team: tenant.team.as_ref().map(|t| t.as_str().to_string()),
334        user: tenant.user.as_ref().map(|u| u.as_str().to_string()),
335        trace_id: tenant.trace_id.clone(),
336        i18n_id: tenant.i18n_id.clone(),
337        correlation_id: tenant.correlation_id.clone(),
338        deadline_unix_ms: tenant.deadline.and_then(|deadline| {
339            let millis = deadline.unix_millis();
340            if millis >= 0 {
341                u64::try_from(millis).ok()
342            } else {
343                None
344            }
345        }),
346        attempt: tenant.attempt,
347        idempotency_key: tenant.idempotency_key.clone(),
348    }
349}
350
351struct TimeoutGuard {
352    done: Arc<AtomicBool>,
353}
354
355impl TimeoutGuard {
356    fn new(done: Arc<AtomicBool>) -> Self {
357        Self { done }
358    }
359}
360
361impl Drop for TimeoutGuard {
362    fn drop(&mut self) {
363        self.done.store(true, Ordering::Relaxed);
364    }
365}
366
367fn is_timeout_error(err: &anyhow::Error) -> bool {
368    err.chain()
369        .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
370        .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
371}
372
373fn duration_ms(duration: Duration) -> u64 {
374    duration.as_millis().try_into().unwrap_or(u64::MAX)
375}
376
377fn describe_wasm_metadata(bytes: &[u8]) -> String {
378    let mut hasher = Hasher::new();
379    hasher.update(bytes);
380    format!("len={}, blake3:{}", bytes.len(), hasher.finalize().to_hex())
381}