use async_trait::async_trait;
use extism::{Manifest, PluginBuilder, UserData, Wasm};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::info;
use crate::context::CapsuleContext;
use crate::engine::ExecutionEngine;
use crate::engine::wasm::host::register_host_functions;
use crate::engine::wasm::host_state::{HostState, LifecyclePhase};
use crate::error::{CapsuleError, CapsuleResult};
use crate::manifest::CapsuleManifest;
pub mod host;
pub mod host_state;
pub(crate) mod tool;
pub struct WasmEngine {
manifest: CapsuleManifest,
_capsule_dir: PathBuf,
plugin: Option<Arc<Mutex<extism::Plugin>>>,
inbound_rx: Option<tokio::sync::mpsc::Receiver<astrid_core::InboundMessage>>,
tools: Vec<Arc<dyn crate::tool::CapsuleTool>>,
run_handle: Option<tokio::task::JoinHandle<()>>,
ready_rx: Option<tokio::sync::Mutex<tokio::sync::watch::Receiver<bool>>>,
cancel_token: Option<tokio_util::sync::CancellationToken>,
}
impl WasmEngine {
pub fn new(manifest: CapsuleManifest, capsule_dir: PathBuf) -> Self {
Self {
manifest,
_capsule_dir: capsule_dir,
plugin: None,
inbound_rx: None,
tools: Vec::new(),
run_handle: None,
ready_rx: None,
cancel_token: None,
}
}
}
#[async_trait]
impl ExecutionEngine for WasmEngine {
async fn load(&mut self, ctx: &CapsuleContext) -> CapsuleResult<()> {
info!(
capsule = %self.manifest.package.name,
"Loading Pure WASM component"
);
let component = self.manifest.components.first().ok_or_else(|| {
CapsuleError::UnsupportedEntryPoint(
"WASM engine requires at least one component definition".into(),
)
})?;
let wasm_path = if component.path.is_absolute() {
component.path.clone()
} else {
self._capsule_dir.join(&component.path)
};
let workspace_root = ctx.workspace_root.clone();
let kv = ctx.kv.clone();
let event_bus = astrid_events::EventBus::clone(&ctx.event_bus);
let manifest = self.manifest.clone();
let mut wasm_config = std::collections::HashMap::new();
if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
wasm_config.insert(
"ASTRID_SOCKET_PATH".to_string(),
serde_json::Value::String(home.socket_path().to_string_lossy().into_owned()),
);
}
let reserved_keys: Vec<String> = wasm_config.keys().cloned().collect();
let resolved_env =
super::resolve_env(&self.manifest, ctx, &reserved_keys, "wasm_engine").await?;
for (key, val) in resolved_env {
wasm_config.insert(key, serde_json::Value::String(val));
}
let capsule_uuid = uuid::Uuid::new_v4();
let host_semaphore = HostState::default_host_semaphore();
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_token_for_state = cancel_token.clone();
let process_tracker = Arc::new(crate::engine::wasm::host::process::ProcessTracker::new());
let process_tracker_for_listener = process_tracker.clone();
let (plugin, rx, has_run, ready_rx) = tokio::task::block_in_place(move || {
let wasm_bytes = std::fs::read(&wasm_path).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to read WASM: {e}"))
})?;
let (tx, rx) = if !manifest.uplinks.is_empty() {
let (tx, rx) = tokio::sync::mpsc::channel(128);
(Some(tx), Some(rx))
} else {
(None, None)
};
let lower_vfs = astrid_vfs::HostVfs::new();
let upper_vfs = astrid_vfs::HostVfs::new();
let root_handle = astrid_capabilities::DirHandle::new();
let global_root = ctx.global_root.clone();
let upper_temp = tempfile::TempDir::new().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to create overlay temp dir: {e}"
))
})?;
tokio::runtime::Handle::current()
.block_on(async {
lower_vfs
.register_dir(root_handle.clone(), workspace_root.clone())
.await?;
upper_vfs
.register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
.await?;
Ok::<(), astrid_vfs::VfsError>(())
})
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to register VFS directory: {e}"
))
})?;
let (global_vfs, global_vfs_root_handle): (
Option<Arc<dyn astrid_vfs::Vfs>>,
Option<astrid_capabilities::DirHandle>,
) = if let Some(ref g_root) = global_root {
if g_root.exists() {
let g_vfs = astrid_vfs::HostVfs::new();
let g_handle = astrid_capabilities::DirHandle::new();
tokio::runtime::Handle::current()
.block_on(async {
g_vfs.register_dir(g_handle.clone(), g_root.clone()).await
})
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to register global VFS directory: {e}"
))
})?;
(
Some(Arc::new(g_vfs) as Arc<dyn astrid_vfs::Vfs>),
Some(g_handle),
)
} else {
tracing::warn!(
global_root = %g_root.display(),
"global:// VFS not mounted: directory does not exist. \
Capsules requesting global:// paths will receive errors \
until the directory is created and the kernel is restarted."
);
(None, None)
}
} else {
(None, None)
};
let overlay_vfs = Arc::new(astrid_vfs::OverlayVfs::new(
Box::new(lower_vfs),
Box::new(upper_vfs),
));
let next_subscription_id = 1;
let gate_global_root = if global_vfs.is_some() {
global_root.clone()
} else {
None
};
let security_gate = Arc::new(crate::security::ManifestSecurityGate::new(
manifest.clone(),
workspace_root.clone(),
gate_global_root,
));
let secret_store = astrid_storage::build_secret_store(
&manifest.package.name,
kv.clone(),
tokio::runtime::Handle::current(),
);
let host_state = HostState {
capsule_uuid,
caller_context: None,
capsule_id: crate::capsule::CapsuleId::new(&manifest.package.name)
.map_err(|e| CapsuleError::UnsupportedEntryPoint(e.to_string()))?,
workspace_root,
vfs: Arc::clone(&overlay_vfs) as Arc<dyn astrid_vfs::Vfs>,
vfs_root_handle: root_handle,
global_root,
global_vfs,
global_vfs_root_handle,
overlay_vfs: Some(overlay_vfs),
upper_dir: Some(Arc::new(upper_temp)),
kv,
event_bus,
ipc_limiter: astrid_events::ipc::IpcRateLimiter::new(),
subscriptions: std::collections::HashMap::new(),
next_subscription_id,
config: wasm_config,
ipc_publish_patterns: manifest.capabilities.ipc_publish.clone(),
ipc_subscribe_patterns: manifest.capabilities.ipc_subscribe.clone(),
cli_socket_listener: if manifest.capabilities.net_bind.is_empty() {
None
} else {
ctx.cli_socket_listener.clone()
},
active_streams: std::collections::HashMap::new(),
next_stream_id: 1,
active_http_streams: std::collections::HashMap::new(),
next_http_stream_id: 1,
security: Some(security_gate),
hook_manager: None, capsule_registry: ctx.capsule_registry.clone(),
runtime_handle: tokio::runtime::Handle::current(),
has_uplink_capability: !manifest.uplinks.is_empty(),
inbound_tx: tx,
registered_uplinks: Vec::new(),
lifecycle_phase: None,
secret_store,
ready_tx: None,
host_semaphore,
cancel_token: cancel_token_for_state,
session_token: if manifest.capabilities.net_bind.is_empty() {
None
} else {
ctx.session_token.clone()
},
interceptor_handles: Vec::new(),
allowance_store: ctx.allowance_store.clone(),
identity_store: ctx.identity_store.clone(),
background_processes: std::collections::HashMap::new(),
next_process_id: 1,
process_tracker: process_tracker.clone(),
};
let user_data = UserData::new(host_state);
let user_data_ref = user_data.clone();
let has_run_export = wasm_exports_contain_run(&wasm_bytes);
let extism_wasm = Wasm::data(wasm_bytes);
let mut extism_manifest = Manifest::new([extism_wasm]).with_memory_max(1024);
let is_daemon = !manifest.uplinks.is_empty()
|| !manifest.cron_jobs.is_empty()
|| manifest.capabilities.uplink;
if !is_daemon && !has_run_export {
extism_manifest = extism_manifest.with_timeout(std::time::Duration::from_secs(10));
}
let builder = PluginBuilder::new(extism_manifest).with_wasi(true);
let builder = register_host_functions(builder, user_data);
let plugin = builder.build().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to build Extism plugin: {e}"))
})?;
let has_run = plugin.function_exists("run");
if has_run != has_run_export {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"pre-scan/post-build run() export mismatch \
(pre-scan: {has_run_export}, post-build: {has_run}). \
Cannot safely determine timeout."
)));
}
let ready_rx = if has_run {
let (ready_tx, ready_rx) = tokio::sync::watch::channel(false);
let ud = user_data_ref.get().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to access HostState: {e}"))
})?;
ud.lock()
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("HostState lock poisoned: {e}"))
})?
.ready_tx = Some(ready_tx);
Some(ready_rx)
} else {
None
};
if has_run && !manifest.interceptors.is_empty() {
const MAX_AUTO_SUBSCRIBE: usize = 64;
if manifest.interceptors.len() > MAX_AUTO_SUBSCRIBE {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"Capsule '{}' declares {} interceptors, exceeding the \
auto-subscribe limit ({MAX_AUTO_SUBSCRIBE})",
manifest.package.name,
manifest.interceptors.len()
)));
}
for interceptor in &manifest.interceptors {
if !crate::dispatcher::has_valid_segments(&interceptor.event) {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"Interceptor event '{}' has invalid segment structure \
(empty segments, leading/trailing dots, or empty string)",
interceptor.event
)));
}
}
let ud = user_data_ref.get().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to access HostState: {e}"))
})?;
let mut state = ud.lock().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("HostState lock poisoned: {e}"))
})?;
for interceptor in &manifest.interceptors {
let receiver = state.event_bus.subscribe_topic(&interceptor.event);
let handle_id = state.next_subscription_id;
state.next_subscription_id = state.next_subscription_id.wrapping_add(1);
state.subscriptions.insert(handle_id, receiver);
state
.interceptor_handles
.push(host_state::InterceptorHandle {
handle_id,
action: interceptor.action.clone(),
topic: interceptor.event.clone(),
});
}
tracing::debug!(
capsule = %manifest.package.name,
count = manifest.interceptors.len(),
"Auto-subscribed interceptors for run-loop capsule"
);
}
Ok::<_, CapsuleError>((plugin, rx, has_run, ready_rx))
})?;
if let Some(registry) = &ctx.capsule_registry {
let capsule_id = crate::capsule::CapsuleId::new(&self.manifest.package.name)
.map_err(|e| CapsuleError::UnsupportedEntryPoint(e.to_string()))?;
registry
.write()
.await
.register_uuid(capsule_uuid, capsule_id);
}
let plugin_arc = Arc::new(Mutex::new(plugin));
self.cancel_token = Some(cancel_token.clone());
if !self.manifest.capabilities.host_process.is_empty() {
let bus = ctx.event_bus.clone();
let tracker = process_tracker_for_listener;
let ct = cancel_token.clone();
let capsule_name = self.manifest.package.name.clone();
tokio::task::spawn(async move {
let mut receiver = bus.subscribe_topic("tool.v1.request.cancel");
let handle = tokio::runtime::Handle::current();
loop {
tokio::select! {
biased;
() = ct.cancelled() => break,
event = receiver.recv() => {
match event.as_deref() {
Some(astrid_events::AstridEvent::Ipc { message, .. }) => {
if let astrid_events::ipc::IpcPayload::ToolCancelRequest { call_ids } = &message.payload {
tracing::info!(
capsule = %capsule_name,
?call_ids,
"Received tool cancel event, killing tracked processes"
);
tracker.cancel_by_call_ids(call_ids, &handle);
}
},
Some(_) => {}, None => break, }
}
}
}
});
}
if has_run {
self.ready_rx = ready_rx.map(tokio::sync::Mutex::new);
let capsule_name = self.manifest.package.name.clone();
self.run_handle = Some(tokio::task::spawn(async move {
tracing::info!(capsule = %capsule_name, "Starting background WASM run loop");
tokio::task::block_in_place(|| {
let mut p = match plugin_arc.lock() {
Ok(guard) => guard,
Err(e) => {
tracing::error!(capsule = %capsule_name, error = %e, "WASM plugin lock was poisoned");
return;
},
};
if let Err(e) = p.call::<(), ()>("run", ()) {
tracing::error!(capsule = %capsule_name, error = %e, "WASM background loop failed");
}
});
}));
} else {
let mut tools: Vec<Arc<dyn crate::tool::CapsuleTool>> = Vec::new();
for t in &self.manifest.tools {
tools.push(Arc::new(tool::WasmCapsuleTool::new(
t.name.clone(),
t.description.clone(),
t.input_schema.clone(),
Arc::clone(&plugin_arc),
)));
}
self.tools = tools;
self.plugin = Some(plugin_arc);
}
self.inbound_rx = rx;
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
info!(
capsule = %self.manifest.package.name,
"Unloading WASM component"
);
if let Some(token) = self.cancel_token.take() {
token.cancel();
}
if let Some(handle) = self.run_handle.take() {
handle.abort();
}
self.plugin = None; self.ready_rx = None; self.tools.clear();
Ok(())
}
async fn wait_ready(&self, timeout: std::time::Duration) -> crate::capsule::ReadyStatus {
use crate::capsule::ReadyStatus;
let Some(rx_mutex) = &self.ready_rx else {
return ReadyStatus::Ready;
};
let mut rx = rx_mutex.lock().await.clone();
match tokio::time::timeout(timeout, rx.wait_for(|&v| v)).await {
Ok(Ok(_)) => ReadyStatus::Ready,
Ok(Err(_)) => ReadyStatus::Crashed, Err(_) => ReadyStatus::Timeout,
}
}
fn take_inbound_rx(
&mut self,
) -> Option<tokio::sync::mpsc::Receiver<astrid_core::InboundMessage>> {
self.inbound_rx.take()
}
fn tools(&self) -> &[Arc<dyn crate::tool::CapsuleTool>] {
&self.tools
}
fn invoke_interceptor(&self, action: &str, payload: &[u8]) -> CapsuleResult<Vec<u8>> {
let plugin = self.plugin.as_ref().ok_or_else(|| {
CapsuleError::NotSupported(
"plugin handles interceptors internally via IPC auto-subscribe".into(),
)
})?;
let request = serde_json::json!({
"name": action,
"arguments": payload,
});
let input = serde_json::to_vec(&request).map_err(|e| {
CapsuleError::ExecutionFailed(format!("failed to serialize interceptor request: {e}"))
})?;
tokio::task::block_in_place(|| {
let mut plugin = plugin
.lock()
.map_err(|e| CapsuleError::WasmError(format!("plugin lock poisoned: {e}")))?;
plugin
.call::<&[u8], Vec<u8>>("astrid_hook_trigger", &input)
.map_err(|e| CapsuleError::WasmError(format!("astrid_hook_trigger failed: {e:?}")))
})
}
fn check_health(&self) -> crate::capsule::CapsuleState {
if let Some(handle) = &self.run_handle
&& handle.is_finished()
{
return crate::capsule::CapsuleState::Failed(
"WASM run loop exited unexpectedly".into(),
);
}
crate::capsule::CapsuleState::Ready
}
}
pub struct LifecycleConfig {
pub wasm_bytes: Vec<u8>,
pub capsule_id: crate::capsule::CapsuleId,
pub workspace_root: PathBuf,
pub kv: astrid_storage::ScopedKvStore,
pub event_bus: astrid_events::EventBus,
pub config: std::collections::HashMap<String, serde_json::Value>,
pub secret_store: std::sync::Arc<dyn astrid_storage::secret::SecretStore>,
}
pub fn run_lifecycle(
cfg: LifecycleConfig,
phase: LifecyclePhase,
previous_version: Option<&str>,
) -> CapsuleResult<()> {
let export_name = match phase {
LifecyclePhase::Install => "astrid_install",
LifecyclePhase::Upgrade => "astrid_upgrade",
};
let vfs = astrid_vfs::HostVfs::new();
let root_handle = astrid_capabilities::DirHandle::new();
tokio::runtime::Handle::current()
.block_on(async {
vfs.register_dir(root_handle.clone(), cfg.workspace_root.clone())
.await
})
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to register VFS directory for lifecycle: {e}"
))
})?;
let host_state = HostState {
capsule_uuid: uuid::Uuid::new_v4(),
caller_context: None,
capsule_id: cfg.capsule_id.clone(),
workspace_root: cfg.workspace_root,
vfs: Arc::new(vfs),
vfs_root_handle: root_handle,
global_root: None,
global_vfs: None,
global_vfs_root_handle: None,
overlay_vfs: None,
upper_dir: None,
kv: cfg.kv,
event_bus: cfg.event_bus,
ipc_limiter: astrid_events::ipc::IpcRateLimiter::new(),
subscriptions: std::collections::HashMap::new(),
next_subscription_id: 1,
config: cfg.config,
ipc_publish_patterns: Vec::new(),
ipc_subscribe_patterns: Vec::new(),
security: None,
hook_manager: None,
capsule_registry: None,
runtime_handle: tokio::runtime::Handle::current(),
has_uplink_capability: false,
inbound_tx: None,
registered_uplinks: Vec::new(),
cli_socket_listener: None,
active_streams: std::collections::HashMap::new(),
next_stream_id: 1,
active_http_streams: std::collections::HashMap::new(),
next_http_stream_id: 1,
lifecycle_phase: Some(phase),
secret_store: cfg.secret_store,
ready_tx: None,
host_semaphore: HostState::default_host_semaphore(),
cancel_token: tokio_util::sync::CancellationToken::new(),
session_token: None,
interceptor_handles: Vec::new(),
allowance_store: None,
identity_store: None,
background_processes: std::collections::HashMap::new(),
next_process_id: 1,
process_tracker: Arc::new(host::process::ProcessTracker::new()),
};
let user_data = UserData::new(host_state);
let extism_wasm = Wasm::data(cfg.wasm_bytes);
let extism_manifest = Manifest::new([extism_wasm]).with_memory_max(1024);
let builder = PluginBuilder::new(extism_manifest).with_wasi(true);
let builder = register_host_functions(builder, user_data);
let mut plugin = builder.build().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to build Extism plugin for lifecycle: {e}"
))
})?;
if !plugin.function_exists(export_name) {
tracing::debug!(
capsule = %cfg.capsule_id,
export = export_name,
"Capsule does not export lifecycle hook, skipping"
);
return Ok(());
}
tracing::info!(
capsule = %cfg.capsule_id,
phase = ?phase,
previous_version = previous_version.unwrap_or("(none)"),
"Running lifecycle hook"
);
let input = previous_version.unwrap_or("");
plugin.call::<&str, ()>(export_name, input).map_err(|e| {
CapsuleError::ExecutionFailed(format!("lifecycle hook {export_name} failed: {e}"))
})?;
tracing::info!(
capsule = %cfg.capsule_id,
phase = ?phase,
"Lifecycle hook completed successfully"
);
Ok(())
}
fn wasm_exports_contain_run(wasm_bytes: &[u8]) -> bool {
for payload in wasmparser::Parser::new(0).parse_all(wasm_bytes) {
match payload {
Ok(wasmparser::Payload::ExportSection(reader)) => {
return reader.into_iter().any(|export| match export {
Ok(e) => e.name == "run" && e.kind == wasmparser::ExternalKind::Func,
Err(e) => {
tracing::warn!("failed to parse WASM export entry: {e}");
true },
});
},
Err(e) => {
tracing::warn!("failed to pre-scan WASM binary: {e}");
return true; },
_ => {},
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
fn poison_mutex<T: Send + 'static>(mutex: &Arc<Mutex<T>>) {
let m = Arc::clone(mutex);
let _ = std::thread::spawn(move || {
let _guard = m.lock().unwrap();
panic!("intentional panic to poison mutex");
})
.join();
}
#[tokio::test]
async fn poisoned_lock_in_run_loop_does_not_panic() {
let plugin_arc: Arc<Mutex<String>> = Arc::new(Mutex::new("fake_plugin".into()));
poison_mutex(&plugin_arc);
let handle = tokio::task::spawn_blocking(move || {
let capsule_name = "test-capsule";
let _p = match plugin_arc.lock() {
Ok(guard) => guard,
Err(e) => {
tracing::error!(capsule = %capsule_name, error = %e, "WASM plugin lock was poisoned");
return false;
},
};
true
});
let result = handle.await;
assert!(result.is_ok(), "spawn_blocking should not panic");
assert!(!result.unwrap(), "should have taken the poison error path");
}
#[test]
fn poisoned_lock_in_interceptor_returns_error() {
let plugin: Arc<Mutex<String>> = Arc::new(Mutex::new("fake_plugin".into()));
poison_mutex(&plugin);
let result: CapsuleResult<Vec<u8>> = plugin
.lock()
.map_err(|e| CapsuleError::WasmError(format!("plugin lock poisoned: {e}")))
.map(|_guard| vec![]);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, CapsuleError::WasmError(_)),
"expected WasmError, got: {err:?}"
);
let msg = err.to_string();
assert!(
msg.contains("poisoned"),
"error message should mention poisoning: {msg}"
);
}
#[test]
fn build_onboarding_field_text() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: Some("Enter owner address".into()),
description: Some("The wallet address".into()),
default: None,
enum_values: vec![],
placeholder: None,
};
let field = crate::engine::build_onboarding_field("owner", &def);
assert_eq!(field.key, "owner");
assert_eq!(field.prompt, "Enter owner address");
assert_eq!(field.description.as_deref(), Some("The wallet address"));
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Text
);
assert!(field.default.is_none());
}
#[test]
fn build_onboarding_field_secret() {
let def = crate::manifest::EnvDef {
env_type: "secret".into(),
request: None,
description: None,
default: None,
enum_values: vec!["a".into()], placeholder: None,
};
let field = crate::engine::build_onboarding_field("apiKey", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Secret
);
}
#[test]
fn build_onboarding_field_enum_with_default() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: Some("Select network".into()),
description: None,
default: Some(serde_json::json!("testnet")),
enum_values: vec!["testnet".into(), "mainnet".into()],
placeholder: None,
};
let field = crate::engine::build_onboarding_field("network", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Enum(vec!["testnet".into(), "mainnet".into()])
);
assert_eq!(field.default.as_deref(), Some("testnet"));
}
#[test]
fn build_onboarding_field_fallback_prompt() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: None,
description: None,
default: None,
enum_values: vec![],
placeholder: None,
};
let field = crate::engine::build_onboarding_field("someKey", &def);
assert_eq!(field.prompt, "Please enter value for someKey");
}
#[test]
fn build_onboarding_field_single_enum_degrades_to_text_with_autofill() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: None,
description: None,
default: None,
enum_values: vec!["only".into()],
placeholder: None,
};
let field = crate::engine::build_onboarding_field("single", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Text,
"Single-choice enum should degrade to text"
);
assert_eq!(
field.default.as_deref(),
Some("only"),
"Single-choice enum should auto-fill the sole valid value"
);
}
#[test]
fn build_onboarding_field_array() {
let def = crate::manifest::EnvDef {
env_type: "array".into(),
request: Some("Enter relay URLs".into()),
description: Some("Nostr relay endpoints".into()),
default: None,
enum_values: vec![],
placeholder: None,
};
let field = crate::engine::build_onboarding_field("relays", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Array
);
assert_eq!(field.prompt, "Enter relay URLs");
}
#[test]
fn build_onboarding_field_empty_enum_degrades_to_text() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: None,
description: None,
default: None,
enum_values: vec![],
placeholder: None,
};
let field = crate::engine::build_onboarding_field("empty", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Text,
"Empty enum should degrade to text"
);
}
async fn wait_ready_from_rx(
rx: &tokio::sync::Mutex<tokio::sync::watch::Receiver<bool>>,
timeout: std::time::Duration,
) -> crate::capsule::ReadyStatus {
use crate::capsule::ReadyStatus;
let mut rx = rx.lock().await.clone();
match tokio::time::timeout(timeout, rx.wait_for(|&v| v)).await {
Ok(Ok(_)) => ReadyStatus::Ready,
Ok(Err(_)) => ReadyStatus::Crashed,
Err(_) => ReadyStatus::Timeout,
}
}
#[tokio::test]
async fn wait_ready_returns_ready_when_pre_signaled() {
let (tx, rx) = tokio::sync::watch::channel(false);
let _ = tx.send(true);
let rx_mutex = tokio::sync::Mutex::new(rx);
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(100)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Ready);
}
#[tokio::test]
async fn wait_ready_returns_timeout_when_never_signaled() {
let (_tx, rx) = tokio::sync::watch::channel(false);
let rx_mutex = tokio::sync::Mutex::new(rx);
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(10)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Timeout);
}
#[tokio::test]
async fn wait_ready_returns_crashed_when_sender_dropped() {
let (tx, rx) = tokio::sync::watch::channel(false);
drop(tx); let rx_mutex = tokio::sync::Mutex::new(rx);
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(100)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Crashed);
}
#[tokio::test]
async fn wait_ready_returns_ready_when_signaled_after_delay() {
let (tx, rx) = tokio::sync::watch::channel(false);
let rx_mutex = tokio::sync::Mutex::new(rx);
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = tx.send(true);
});
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(500)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Ready);
}
fn build_wasm_module(export_names: &[&str]) -> Vec<u8> {
use wasm_encoder::{
CodeSection, ExportKind, ExportSection, Function, FunctionSection, Module, TypeSection,
};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let mut functions = FunctionSection::new();
for _ in export_names {
functions.function(0);
}
module.section(&functions);
let mut exports = ExportSection::new();
for (i, name) in export_names.iter().enumerate() {
exports.export(*name, ExportKind::Func, i as u32);
}
module.section(&exports);
let mut code = CodeSection::new();
for _ in export_names {
let mut f = Function::new(vec![]);
f.instruction(&wasm_encoder::Instruction::End);
code.function(&f);
}
module.section(&code);
module.finish()
}
#[test]
fn prescan_detects_run_export() {
let wasm = build_wasm_module(&["run"]);
assert!(wasm_exports_contain_run(&wasm), "should detect run export");
}
#[test]
fn prescan_returns_false_without_run() {
let wasm = build_wasm_module(&["tool_call", "install"]);
assert!(
!wasm_exports_contain_run(&wasm),
"should not detect run when absent"
);
}
#[test]
fn prescan_detects_run_among_multiple_exports() {
let wasm = build_wasm_module(&["install", "run", "tool_call"]);
assert!(
wasm_exports_contain_run(&wasm),
"should detect run among multiple exports"
);
}
#[test]
fn prescan_returns_false_for_empty_export_section() {
let wasm = build_wasm_module(&[]);
assert!(
!wasm_exports_contain_run(&wasm),
"empty export section should not have run"
);
}
#[test]
fn prescan_returns_false_for_module_with_no_export_section() {
use wasm_encoder::{Module, TypeSection};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let wasm = module.finish();
assert!(
!wasm_exports_contain_run(&wasm),
"module with no export section should not have run"
);
}
#[test]
fn prescan_returns_true_for_corrupt_binary() {
let garbage = b"not a wasm module at all";
assert!(
wasm_exports_contain_run(garbage),
"corrupt binary should default to true (safe: no timeout)"
);
}
#[test]
fn prescan_ignores_non_func_run_export() {
use wasm_encoder::{
ExportKind, ExportSection, GlobalSection, GlobalType, Module, TypeSection, ValType,
};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let mut globals = GlobalSection::new();
globals.global(
GlobalType {
val_type: ValType::I32,
mutable: false,
shared: false,
},
&wasm_encoder::ConstExpr::i32_const(42),
);
module.section(&globals);
let mut exports = ExportSection::new();
exports.export("run", ExportKind::Global, 0);
module.section(&exports);
let wasm = module.finish();
assert!(
!wasm_exports_contain_run(&wasm),
"global named 'run' should not be detected as a function export"
);
}
}