Skip to main content

greentic_component/test_harness/
mod.rs

1use std::collections::{HashMap, HashSet};
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::{Duration, Instant};
6
7use anyhow::{Context, Result};
8use greentic_interfaces_host::component::v0_5::exports::greentic::component::node;
9use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::GuestIndices;
10use greentic_types::TenantCtx;
11use serde_json::Value;
12use wasmtime::component::{Component, InstancePre};
13use wasmtime::{Config, Engine, Store};
14
15use crate::test_harness::linker::{HostState, HostStateConfig, build_linker};
16use crate::test_harness::secrets::InMemorySecretsStore;
17use crate::test_harness::state::{InMemoryStateStore, StateDumpEntry, StateScope};
18
19mod linker;
20mod secrets;
21mod state;
22
23#[derive(Debug)]
24pub struct ComponentInvokeError {
25    pub code: String,
26    pub message: String,
27    pub retryable: bool,
28    pub backoff_ms: Option<u64>,
29    pub details: Option<String>,
30}
31
32impl std::fmt::Display for ComponentInvokeError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        write!(f, "component error {}: {}", self.code, self.message)
35    }
36}
37
38impl std::error::Error for ComponentInvokeError {}
39
40#[derive(Debug)]
41pub enum HarnessError {
42    Timeout { timeout_ms: u64 },
43    MemoryLimit { max_memory_bytes: usize },
44}
45
46impl std::fmt::Display for HarnessError {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            HarnessError::Timeout { timeout_ms } => {
50                write!(f, "execution exceeded timeout of {timeout_ms}ms")
51            }
52            HarnessError::MemoryLimit { max_memory_bytes } => {
53                write!(
54                    f,
55                    "execution exceeded memory limit of {max_memory_bytes} bytes"
56                )
57            }
58        }
59    }
60}
61
62impl std::error::Error for HarnessError {}
63
64pub struct HarnessConfig {
65    pub wasm_bytes: Vec<u8>,
66    pub tenant_ctx: TenantCtx,
67    pub flow_id: String,
68    pub node_id: Option<String>,
69    pub state_prefix: String,
70    pub state_seeds: Vec<(String, Vec<u8>)>,
71    pub allow_state_read: bool,
72    pub allow_state_write: bool,
73    pub allow_state_delete: bool,
74    pub allow_secrets: bool,
75    pub allowed_secrets: HashSet<String>,
76    pub secrets: HashMap<String, String>,
77    pub wasi_preopens: Vec<WasiPreopen>,
78    pub config: Option<Value>,
79    pub allow_http: bool,
80    pub timeout_ms: u64,
81    pub max_memory_bytes: usize,
82}
83
84#[derive(Clone, Debug)]
85pub struct WasiPreopen {
86    pub host_path: PathBuf,
87    pub guest_path: String,
88    pub read_only: bool,
89}
90
91impl WasiPreopen {
92    pub fn new(host_path: impl Into<PathBuf>, guest_path: impl Into<String>) -> Self {
93        Self {
94            host_path: host_path.into(),
95            guest_path: guest_path.into(),
96            read_only: false,
97        }
98    }
99
100    pub fn read_only(mut self, value: bool) -> Self {
101        self.read_only = value;
102        self
103    }
104}
105
106pub struct TestHarness {
107    engine: Engine,
108    instance_pre: InstancePre<HostState>,
109    guest_indices: GuestIndices,
110    state_store: Arc<InMemoryStateStore>,
111    secrets_store: Arc<InMemorySecretsStore>,
112    state_scope: StateScope,
113    allow_state_read: bool,
114    allow_state_write: bool,
115    allow_state_delete: bool,
116    exec_ctx: node::ExecCtx,
117    wasi_preopens: Vec<WasiPreopen>,
118    config_json: Option<String>,
119    allow_http: bool,
120    timeout_ms: u64,
121    max_memory_bytes: usize,
122}
123
124pub struct InvokeOutcome {
125    pub output_json: String,
126    pub instantiate_ms: u64,
127    pub run_ms: u64,
128}
129
130impl TestHarness {
131    pub fn new(config: HarnessConfig) -> Result<Self> {
132        let mut wasmtime_config = Config::new();
133        wasmtime_config.wasm_component_model(true);
134        wasmtime_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
135        wasmtime_config.epoch_interruption(true);
136        let engine = Engine::new(&wasmtime_config).context("create wasmtime engine")?;
137
138        let component =
139            Component::from_binary(&engine, &config.wasm_bytes).context("load component wasm")?;
140
141        let linker = build_linker(&engine)?;
142        let instance_pre = linker
143            .instantiate_pre(&component)
144            .context("prepare component instance")?;
145        let guest_indices = GuestIndices::new(&instance_pre).context("load guest indices")?;
146
147        let state_store = Arc::new(InMemoryStateStore::new());
148        let secrets_store = InMemorySecretsStore::new(config.allow_secrets, config.allowed_secrets);
149        let secrets_store = Arc::new(secrets_store.with_secrets(config.secrets));
150        let scope = StateScope::from_tenant_ctx(&config.tenant_ctx, config.state_prefix);
151        for (key, value) in config.state_seeds {
152            state_store.write(&scope, &key, value);
153        }
154
155        let exec_ctx = node::ExecCtx {
156            tenant: make_component_tenant_ctx(&config.tenant_ctx),
157            i18n_id: config.tenant_ctx.i18n_id.clone(),
158            flow_id: config.flow_id,
159            node_id: config.node_id,
160        };
161
162        let config_json = match config.config {
163            Some(value) => Some(serde_json::to_string(&value).context("serialize config json")?),
164            None => None,
165        };
166
167        Ok(Self {
168            engine,
169            instance_pre,
170            guest_indices,
171            state_store,
172            secrets_store,
173            state_scope: scope,
174            allow_state_read: config.allow_state_read,
175            allow_state_write: config.allow_state_write,
176            allow_state_delete: config.allow_state_delete,
177            exec_ctx,
178            wasi_preopens: config.wasi_preopens,
179            config_json,
180            allow_http: config.allow_http,
181            timeout_ms: config.timeout_ms,
182            max_memory_bytes: config.max_memory_bytes,
183        })
184    }
185
186    pub fn invoke(&self, operation: &str, input_json: &Value) -> Result<InvokeOutcome> {
187        let host_state = HostState::new(HostStateConfig {
188            base_scope: self.state_scope.clone(),
189            state_store: self.state_store.clone(),
190            secrets: self.secrets_store.clone(),
191            allow_state_read: self.allow_state_read,
192            allow_state_write: self.allow_state_write,
193            allow_state_delete: self.allow_state_delete,
194            wasi_preopens: self.wasi_preopens.clone(),
195            allow_http: self.allow_http,
196            config_json: self.config_json.clone(),
197            max_memory_bytes: self.max_memory_bytes,
198        })
199        .context("build WASI context")?;
200        let mut store = Store::new(&self.engine, host_state);
201        store.limiter(|state| state.limits_mut());
202        store.set_epoch_deadline(1);
203
204        let done = Arc::new(AtomicBool::new(false));
205        let _timeout_guard = TimeoutGuard::new(done.clone());
206        let engine = self.engine.clone();
207        let timeout_ms = self.timeout_ms;
208        std::thread::spawn(move || {
209            std::thread::sleep(Duration::from_millis(timeout_ms));
210            if !done.load(Ordering::Relaxed) {
211                engine.increment_epoch();
212            }
213        });
214
215        let instantiate_start = Instant::now();
216        let instance = self
217            .instance_pre
218            .instantiate(&mut store)
219            .context("instantiate component")
220            .and_then(|instance| {
221                self.guest_indices
222                    .load(&mut store, &instance)
223                    .context("load component exports")
224                    .map(|exports| (instance, exports))
225            });
226
227        let (_instance, exports) = match instance {
228            Ok(value) => value,
229            Err(err) => {
230                if is_timeout_error(&err) {
231                    return Err(anyhow::Error::new(HarnessError::Timeout {
232                        timeout_ms: self.timeout_ms,
233                    }));
234                }
235                if store.data().memory_limit_hit() {
236                    return Err(anyhow::Error::new(HarnessError::MemoryLimit {
237                        max_memory_bytes: self.max_memory_bytes,
238                    }));
239                }
240                return Err(err);
241            }
242        };
243        let instantiate_ms = duration_ms(instantiate_start.elapsed());
244
245        let input = serde_json::to_string(input_json).context("serialize input json")?;
246        let run_start = Instant::now();
247        let result = exports
248            .call_invoke(&mut store, &self.exec_ctx, operation, &input)
249            .context("invoke component");
250
251        use greentic_interfaces_host::component::v0_5::exports::greentic::component::node::InvokeResult;
252
253        let result = match result {
254            Ok(result) => result,
255            Err(err) => {
256                if is_timeout_error(&err) {
257                    return Err(anyhow::Error::new(HarnessError::Timeout {
258                        timeout_ms: self.timeout_ms,
259                    }));
260                }
261                if store.data().memory_limit_hit() {
262                    return Err(anyhow::Error::new(HarnessError::MemoryLimit {
263                        max_memory_bytes: self.max_memory_bytes,
264                    }));
265                }
266                return Err(err);
267            }
268        };
269        let run_ms = duration_ms(run_start.elapsed());
270
271        match result {
272            InvokeResult::Ok(output_json) => Ok(InvokeOutcome {
273                output_json,
274                instantiate_ms,
275                run_ms,
276            }),
277            InvokeResult::Err(err) => Err(anyhow::Error::new(ComponentInvokeError {
278                code: err.code,
279                message: err.message,
280                retryable: err.retryable,
281                backoff_ms: err.backoff_ms,
282                details: err.details,
283            })),
284        }
285    }
286
287    pub fn state_dump(&self) -> Vec<StateDumpEntry> {
288        self.state_store.dump()
289    }
290}
291
292fn make_component_tenant_ctx(tenant: &TenantCtx) -> node::TenantCtx {
293    node::TenantCtx {
294        tenant: tenant.tenant.as_str().to_string(),
295        team: tenant.team.as_ref().map(|t| t.as_str().to_string()),
296        user: tenant.user.as_ref().map(|u| u.as_str().to_string()),
297        trace_id: tenant.trace_id.clone(),
298        i18n_id: tenant.i18n_id.clone(),
299        correlation_id: tenant.correlation_id.clone(),
300        deadline_unix_ms: tenant.deadline.and_then(|deadline| {
301            let millis = deadline.unix_millis();
302            if millis >= 0 {
303                u64::try_from(millis).ok()
304            } else {
305                None
306            }
307        }),
308        attempt: tenant.attempt,
309        idempotency_key: tenant.idempotency_key.clone(),
310    }
311}
312
313struct TimeoutGuard {
314    done: Arc<AtomicBool>,
315}
316
317impl TimeoutGuard {
318    fn new(done: Arc<AtomicBool>) -> Self {
319        Self { done }
320    }
321}
322
323impl Drop for TimeoutGuard {
324    fn drop(&mut self) {
325        self.done.store(true, Ordering::Relaxed);
326    }
327}
328
329fn is_timeout_error(err: &anyhow::Error) -> bool {
330    err.chain()
331        .find_map(|source| source.downcast_ref::<wasmtime::Trap>())
332        .is_some_and(|trap| matches!(trap, wasmtime::Trap::Interrupt))
333}
334
335fn duration_ms(duration: Duration) -> u64 {
336    duration.as_millis().try_into().unwrap_or(u64::MAX)
337}