use async_trait::async_trait;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as AsyncMutex;
use tracing::info;
use wasmtime::Store;
use wasmtime::component::{Component, Linker};
use crate::context::CapsuleContext;
use crate::engine::ExecutionEngine;
use crate::engine::wasm::host_state::{HostState, LifecyclePhase, PrincipalMount};
use crate::error::{CapsuleError, CapsuleResult};
use crate::manifest::CapsuleManifest;
pub mod bindings;
pub mod host;
pub mod host_state;
pub mod limits;
mod pool;
#[cfg(test)]
mod test_fixtures;
fn today_date_string() -> String {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let secs = now.as_secs();
let days = secs / 86400;
let (y, m, d) = civil_from_days(days as i64);
format!("{y:04}-{m:02}-{d:02}")
}
#[expect(clippy::arithmetic_side_effects)]
fn civil_from_days(days: i64) -> (i64, u32, u32) {
let z = days + 719_468;
let era = z.div_euclid(146_097);
let doe = z.rem_euclid(146_097) as u32;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
let y = (yoe as i64) + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y, m, d)
}
fn prune_old_logs(log_dir: &std::path::Path, max_days: u64) {
let cutoff = std::time::SystemTime::now()
.checked_sub(std::time::Duration::from_secs(max_days * 86400))
.unwrap_or(std::time::UNIX_EPOCH);
let Ok(entries) = std::fs::read_dir(log_dir) else {
return;
};
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if !name_str.ends_with(".log") || name_str.len() != 14 {
continue;
}
if let Ok(meta) = entry.metadata()
&& let Ok(modified) = meta.modified()
&& modified < cutoff
{
let _ = std::fs::remove_file(entry.path());
}
}
}
fn read_expected_wasm_hash(capsule_dir: &std::path::Path) -> Option<String> {
let meta_path = capsule_dir.join("meta.json");
let content = std::fs::read_to_string(&meta_path).ok()?;
let meta: serde_json::Value = serde_json::from_str(&content).ok()?;
meta.get("wasm_hash")?.as_str().map(String::from)
}
fn resolve_content_addressed_wasm(capsule_dir: &std::path::Path) -> Option<PathBuf> {
let meta_path = capsule_dir.join("meta.json");
let content = std::fs::read_to_string(&meta_path).ok()?;
let meta: serde_json::Value = serde_json::from_str(&content).ok()?;
let hash = meta.get("wasm_hash")?.as_str()?;
let home = astrid_core::dirs::AstridHome::resolve().ok()?;
let wasm_path = home.bin_dir().join(format!("{hash}.wasm"));
if wasm_path.exists() {
Some(wasm_path)
} else {
None
}
}
const WASM_CAPSULE_TIMEOUT_SECS: u64 = 5 * 60;
const EPOCH_TICK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
pub struct WasmEngine {
manifest: CapsuleManifest,
_capsule_dir: PathBuf,
wasmtime_engine: Option<wasmtime::Engine>,
pool: Option<pool::CapsuleInstancePool>,
inbound_rx: Option<tokio::sync::mpsc::Receiver<astrid_core::InboundMessage>>,
run_handle: Option<tokio::task::JoinHandle<()>>,
ready_rx: Option<tokio::sync::Mutex<tokio::sync::watch::Receiver<bool>>>,
cancel_token: Option<tokio_util::sync::CancellationToken>,
epoch_ticker: Option<EpochTickerGuard>,
profile_cache: Option<Arc<crate::profile_cache::PrincipalProfileCache>>,
owner_principal: Option<astrid_core::PrincipalId>,
overlay_registry: Option<Arc<astrid_vfs::OverlayVfsRegistry>>,
fuel_ledger: crate::FuelLedger,
memory_ledger: crate::MemoryLedger,
fuel_rate: crate::FuelRateLimiter,
group_config: Option<Arc<astrid_core::GroupConfig>>,
runtime_limits: limits::CapsuleRuntimeLimits,
}
impl WasmEngine {
pub fn new(
manifest: CapsuleManifest,
capsule_dir: PathBuf,
fuel_ledger: crate::FuelLedger,
fuel_rate: crate::FuelRateLimiter,
memory_ledger: crate::MemoryLedger,
runtime_limits: limits::CapsuleRuntimeLimits,
) -> Self {
Self {
manifest,
_capsule_dir: capsule_dir,
wasmtime_engine: None,
pool: None,
inbound_rx: None,
run_handle: None,
ready_rx: None,
cancel_token: None,
epoch_ticker: None,
profile_cache: None,
owner_principal: None,
overlay_registry: None,
fuel_ledger,
memory_ledger,
fuel_rate,
group_config: None,
runtime_limits,
}
}
}
const WASM_MAX_MEMORY_BYTES: usize = 64 * 1024 * 1024;
const DEFAULT_RUN_LOOP_WINDOW_TICKS: u64 = 50;
const MAX_NO_YIELD_WINDOWS: u32 = 3;
const INTERCEPTOR_FUEL_BUDGET: u64 = 10_000_000_000;
pub fn configure_kernel_linker(
linker: &mut wasmtime::component::Linker<HostState>,
) -> wasmtime::Result<()> {
bindings::Kernel::add_to_linker::<HostState, wasmtime::component::HasSelf<HostState>>(
linker,
|state| state,
)
}
fn build_wasmtime_engine() -> CapsuleResult<wasmtime::Engine> {
let mut config = wasmtime::Config::new();
config.wasm_component_model(true).epoch_interruption(true);
config.consume_fuel(true);
#[allow(deprecated)]
config.async_support(true);
wasmtime::Engine::new(&config).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to create wasmtime engine: {e}"))
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct RunLoopBudget {
exempt: bool,
bound_run_loop: bool,
window_ticks: Option<u64>,
mem_bytes: usize,
}
pub(crate) fn resolve_exemption(
owner_profile: Option<&astrid_core::profile::PrincipalProfile>,
group_config: Option<&astrid_core::GroupConfig>,
principal: &astrid_core::PrincipalId,
) -> bool {
let (Some(profile), Some(groups)) = (owner_profile, group_config) else {
return false;
};
let check = astrid_capabilities::CapabilityCheck::new(profile, groups, principal.clone());
astrid_core::EXEMPT_CAPABILITIES
.iter()
.any(|&cap| check.has(cap))
}
const AUDIT_FIREHOSE_CAP: &str = "audit:read_all";
pub(crate) fn resolve_audit_firehose(
owner_profile: Option<&astrid_core::profile::PrincipalProfile>,
group_config: Option<&astrid_core::GroupConfig>,
principal: &astrid_core::PrincipalId,
) -> bool {
let (Some(profile), Some(groups)) = (owner_profile, group_config) else {
return false;
};
astrid_capabilities::CapabilityCheck::new(profile, groups, principal.clone())
.has(AUDIT_FIREHOSE_CAP)
}
fn cpu_rate_deny(
fuel_rate: &crate::FuelRateLimiter,
invocation_profile: Option<&astrid_core::profile::PrincipalProfile>,
group_config: Option<&astrid_core::GroupConfig>,
principal: &astrid_core::PrincipalId,
now: std::time::Instant,
) -> Option<String> {
if resolve_exemption(invocation_profile, group_config, principal) {
return None;
}
let budget = invocation_profile
.map(|p| p.quotas.max_cpu_fuel_per_sec)
.unwrap_or(astrid_core::profile::DEFAULT_MAX_CPU_FUEL_PER_SEC);
if budget > 0 && fuel_rate.over_budget(principal, budget, now) {
return Some(format!(
"principal '{principal}' exceeded CPU budget of {budget} fuel/sec"
));
}
None
}
pub(crate) fn resolve_run_loop_budget(
owner_profile: Option<&astrid_core::profile::PrincipalProfile>,
group_config: Option<&astrid_core::GroupConfig>,
principal: &astrid_core::PrincipalId,
has_run_export: bool,
) -> RunLoopBudget {
let exempt = resolve_exemption(owner_profile, group_config, principal);
let bound_run_loop = has_run_export && !exempt;
let window_ticks = if bound_run_loop {
let ticks = owner_profile
.map(|p| {
let secs = p.quotas.max_timeout_secs;
let by_secs = secs.saturating_mul(1000) / EPOCH_TICK_INTERVAL.as_millis() as u64;
by_secs.clamp(1, DEFAULT_RUN_LOOP_WINDOW_TICKS)
})
.unwrap_or(DEFAULT_RUN_LOOP_WINDOW_TICKS);
Some(ticks.max(1))
} else {
None
};
let mem_bytes = if bound_run_loop {
owner_profile
.map(|p| usize::try_from(p.quotas.max_memory_bytes).unwrap_or(usize::MAX))
.unwrap_or(WASM_MAX_MEMORY_BYTES)
} else {
WASM_MAX_MEMORY_BYTES
};
RunLoopBudget {
exempt,
bound_run_loop,
window_ticks,
mem_bytes,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum EpochAction {
Yield(u64),
Interrupt,
}
pub(crate) fn epoch_decision(
recv_yielded: bool,
no_yield_windows: u32,
window_ticks: u64,
max: u32,
) -> (EpochAction, bool, u32) {
if recv_yielded {
(EpochAction::Yield(window_ticks), false, 0)
} else {
let next = no_yield_windows.saturating_add(1);
if next >= max {
(EpochAction::Interrupt, false, next)
} else {
(EpochAction::Yield(window_ticks), false, next)
}
}
}
fn build_wasi_ctx() -> wasmtime_wasi::WasiCtx {
wasmtime_wasi::WasiCtxBuilder::new()
.inherit_stderr()
.build()
}
#[derive(Default)]
pub(crate) struct PrincipalVfsBundle {
home: Option<PrincipalMount>,
tmp: Option<PrincipalMount>,
}
pub(crate) async fn mount_dir(root: &std::path::Path) -> Option<PrincipalMount> {
if !root.exists() {
return None;
}
let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
let vfs = astrid_vfs::HostVfs::new();
let handle = astrid_capabilities::DirHandle::new();
match vfs.register_dir(handle.clone(), canonical.clone()).await {
Ok(()) => Some(PrincipalMount {
root: canonical,
vfs: Arc::new(vfs) as Arc<dyn astrid_vfs::Vfs>,
handle,
}),
Err(e) => {
tracing::warn!(
root = %canonical.display(),
error = %e,
"failed to register principal VFS; denying scheme access",
);
None
},
}
}
pub(crate) async fn build_principal_vfs_bundle(
principal: &astrid_core::PrincipalId,
) -> PrincipalVfsBundle {
let Ok(astrid_home) = astrid_core::dirs::AstridHome::resolve() else {
return PrincipalVfsBundle::default();
};
build_principal_vfs_bundle_at(&astrid_home.principal_home(principal)).await
}
pub(crate) fn open_capsule_log(
principal: &astrid_core::PrincipalId,
capsule_name: &str,
prune: bool,
) -> Option<Arc<Mutex<std::fs::File>>> {
let astrid_home = astrid_core::dirs::AstridHome::resolve().ok()?;
open_capsule_log_at(&astrid_home.principal_home(principal), capsule_name, prune)
}
pub(crate) fn load_invocation_env_overlay(
principal: &astrid_core::PrincipalId,
capsule_id: &str,
) -> Option<std::collections::HashMap<String, String>> {
const MAX_ENV_FILE_BYTES: u64 = 1 << 20;
let astrid_home = astrid_core::dirs::AstridHome::resolve().ok()?;
let env_path = astrid_home
.principal_home(principal)
.env_dir()
.join(format!("{capsule_id}.env.json"));
let meta = std::fs::metadata(&env_path).ok()?;
if !meta.is_file() || meta.len() > MAX_ENV_FILE_BYTES {
return None;
}
let contents = std::fs::read_to_string(&env_path).ok()?;
serde_json::from_str::<std::collections::HashMap<String, String>>(&contents).ok()
}
fn open_capsule_log_at(
ph: &astrid_core::dirs::PrincipalHome,
capsule_name: &str,
prune: bool,
) -> Option<Arc<Mutex<std::fs::File>>> {
if !ph.root().exists() {
return None;
}
let log_dir = ph.log_dir().join(capsule_name);
std::fs::create_dir_all(&log_dir).ok()?;
if prune {
prune_old_logs(&log_dir, 7);
}
let today = today_date_string();
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_dir.join(format!("{today}.log")))
.ok()
.map(|f| Arc::new(Mutex::new(f)))
}
async fn build_principal_vfs_bundle_at(
ph: &astrid_core::dirs::PrincipalHome,
) -> PrincipalVfsBundle {
let home = mount_dir(ph.root()).await;
let tmp = if home.is_some() {
let t = ph.tmp_dir();
if t.exists() || std::fs::create_dir_all(&t).is_ok() {
mount_dir(&t).await
} else {
None
}
} else {
None
};
PrincipalVfsBundle { home, tmp }
}
fn check_principal_enabled(
profile: &astrid_core::profile::PrincipalProfile,
invoking: &astrid_core::PrincipalId,
capsule_name: &str,
action: &str,
) -> Result<(), CapsuleError> {
if profile.enabled {
return Ok(());
}
tracing::warn!(
security_event = true,
principal = %invoking,
capsule = %capsule_name,
action = action,
"Disabled principal denied at Layer 3 — fail-closed (issue #672)"
);
Err(CapsuleError::WasmError(format!(
"principal '{invoking}' is disabled"
)))
}
pub struct EpochTickerGuard {
handle: Option<std::thread::JoinHandle<()>>,
stop: Arc<std::sync::atomic::AtomicBool>,
}
impl Drop for EpochTickerGuard {
fn drop(&mut self) {
self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
fn spawn_epoch_ticker(engine: &wasmtime::Engine) -> EpochTickerGuard {
let engine = engine.clone();
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = std::thread::Builder::new()
.name("wasm-epoch-ticker".into())
.spawn(move || {
while !stop_clone.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(EPOCH_TICK_INTERVAL);
engine.increment_epoch();
}
})
.expect("failed to spawn epoch ticker thread");
EpochTickerGuard {
handle: Some(handle),
stop,
}
}
#[async_trait]
impl ExecutionEngine for WasmEngine {
async fn load(&mut self, ctx: &CapsuleContext) -> CapsuleResult<()> {
info!(
capsule = %self.manifest.package.name,
"Loading WASM component (Component Model)"
);
let component = self.manifest.components.first().ok_or_else(|| {
CapsuleError::UnsupportedEntryPoint(
"WASM engine requires at least one component definition".into(),
)
})?;
let wasm_path = if component.path.is_absolute() {
component.path.clone()
} else {
let local = self._capsule_dir.join(&component.path);
if local.exists() {
local
} else {
resolve_content_addressed_wasm(&self._capsule_dir).unwrap_or(local)
}
};
let workspace_root = ctx.workspace_root.clone();
let kv = ctx.kv.clone();
let event_bus = astrid_events::EventBus::clone(&ctx.event_bus);
let manifest = self.manifest.clone();
let mut wasm_config = std::collections::HashMap::new();
if let Ok(astrid_home) = astrid_core::dirs::AstridHome::resolve() {
wasm_config.insert(
"ASTRID_SOCKET_PATH".to_string(),
serde_json::Value::String(astrid_home.socket_path().to_string_lossy().into_owned()),
);
}
let reserved_keys: Vec<String> = wasm_config.keys().cloned().collect();
let resolved_env =
super::resolve_env(&self.manifest, ctx, &reserved_keys, "wasm_engine").await?;
for (key, val) in resolved_env {
wasm_config.insert(key, serde_json::Value::String(val));
}
let capsule_uuid = uuid::Uuid::new_v4();
let blocking_semaphore = self.runtime_limits.blocking_semaphore();
let io_semaphore = self.runtime_limits.io_semaphore();
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_token_for_state = cancel_token.clone();
let process_tracker = Arc::new(crate::engine::wasm::host::process::ProcessTracker::new());
let process_tracker_for_listener = process_tracker.clone();
let persistent_registry = Arc::new(
crate::engine::wasm::host::process::PersistentProcessRegistry::new(
tokio::runtime::Handle::current(),
),
);
let persistent_registry_for_reaper = persistent_registry.clone();
let memory_ledger = self.memory_ledger.clone();
let capsule_dir_for_verify = self._capsule_dir.clone();
let (pool_opt, store_arc, run_instance, rx, has_run, ready_rx, wt_engine) = async {
let wasm_bytes = std::fs::read(&wasm_path).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to read WASM: {e}"))
})?;
let actual_hash = blake3::hash(&wasm_bytes).to_hex().to_string();
match read_expected_wasm_hash(&capsule_dir_for_verify) {
Some(expected_hash) if actual_hash == expected_hash => {
},
Some(expected_hash) => {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"WASM integrity check failed: expected BLAKE3 {expected_hash}, \
got {actual_hash}. The binary may have been tampered with."
)));
},
None => {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"WASM capsule '{}' has no BLAKE3 hash in meta.json. \
Capsules must be installed via `astrid capsule install` \
which records the hash. Refusing to load unverified binary.",
manifest.package.name
)));
},
}
let (tx, rx) = if !manifest.uplinks.is_empty() {
let (tx, rx) = tokio::sync::mpsc::channel(128);
(Some(tx), Some(rx))
} else {
(None, None)
};
let lower_vfs = astrid_vfs::HostVfs::new();
let upper_vfs = astrid_vfs::HostVfs::new();
let root_handle = astrid_capabilities::DirHandle::new();
let home_root = ctx.home_root.clone();
let upper_temp = tempfile::TempDir::new().map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to create overlay temp dir: {e}"
))
})?;
async {
lower_vfs
.register_dir(root_handle.clone(), workspace_root.clone())
.await?;
upper_vfs
.register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
.await?;
Ok::<(), astrid_vfs::VfsError>(())
}
.await
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to register VFS directory: {e}"
))
})?;
let home_mount: Option<PrincipalMount> = match home_root.as_deref() {
Some(g_root) if !g_root.exists() => {
tracing::warn!(
home_root = %g_root.display(),
"home:// VFS not mounted: directory does not exist. \
Capsules requesting home:// paths will receive errors \
until the directory is created and the kernel is restarted."
);
None
},
Some(g_root) => mount_dir(g_root).await,
None => None,
};
let overlay_vfs = Arc::new(astrid_vfs::OverlayVfs::new(
Box::new(lower_vfs),
Box::new(upper_vfs),
));
let gate_home_root = home_mount.as_ref().map(|m| m.root.clone());
let security_gate = Arc::new(crate::security::ManifestSecurityGate::new(
manifest.clone(),
workspace_root.clone(),
gate_home_root,
));
let tmp_mount: Option<PrincipalMount> = match astrid_core::dirs::AstridHome::resolve() {
Ok(astrid_home) => {
let dir = astrid_home.principal_home(&ctx.principal).tmp_dir();
if dir.exists() || std::fs::create_dir_all(&dir).is_ok() {
mount_dir(&dir).await
} else {
None
}
},
Err(_) => None,
};
let capsule_log = open_capsule_log(&ctx.principal, &manifest.package.name, true);
let secret_store = astrid_storage::build_secret_store(
&manifest.package.name,
kv.clone(),
tokio::runtime::Handle::current(),
);
let capsule_id_val = crate::capsule::CapsuleId::new(&manifest.package.name)
.map_err(|e| CapsuleError::UnsupportedEntryPoint(e.to_string()))?;
let secret_env_set: std::collections::HashSet<String> = manifest
.env
.iter()
.filter(|(_, d)| d.env_type.eq_ignore_ascii_case("secret"))
.map(|(k, _)| k.clone())
.collect();
let ipc_publish_v = manifest.effective_ipc_publish_patterns();
let ipc_subscribe_v = manifest.effective_ipc_subscribe_patterns();
let cli_listener = if manifest.capabilities.net_bind.is_empty() {
None
} else {
ctx.cli_socket_listener.clone()
};
let session_tok = if manifest.capabilities.net_bind.is_empty() {
None
} else {
ctx.session_token.clone()
};
let has_uplink = manifest.capabilities.uplink;
let capability_names = manifest.capabilities.held_names();
let ipc_limiter = Arc::new(astrid_events::ipc::IpcRateLimiter::new());
let upper_dir_arc = Arc::new(upper_temp);
let has_run_export = wasm_exports_contain_run(&wasm_bytes);
let owner_profile: Option<Arc<astrid_core::profile::PrincipalProfile>> =
ctx.profile_cache.as_ref().and_then(|cache| {
cache
.resolve(&ctx.principal)
.map_err(|e| {
tracing::warn!(
principal = %ctx.principal,
error = %e,
"owner profile resolve failed at load; bounding run-loop \
with the default finite budget (fail-secure)"
);
e
})
.ok()
});
let run_budget = resolve_run_loop_budget(
owner_profile.as_deref(),
ctx.group_config.as_deref(),
&ctx.principal,
has_run_export,
);
let run_loop_mem_bytes: usize = run_budget.mem_bytes;
if run_budget.bound_run_loop {
tracing::debug!(
capsule = %manifest.package.name,
principal = %ctx.principal,
window_ticks = ?run_budget.window_ticks,
mem_bytes = run_loop_mem_bytes,
resolved = owner_profile.is_some(),
"Bounding non-exempt run-loop CPU (epoch interrupt) + memory to owner profile quota"
);
}
let audit_firehose = resolve_audit_firehose(
owner_profile.as_deref(),
ctx.group_config.as_deref(),
&ctx.principal,
);
let blocking_semaphore = blocking_semaphore.clone();
let io_semaphore = io_semaphore.clone();
let cancel_token_for_state = cancel_token_for_state.clone();
let process_tracker = process_tracker.clone();
let persistent_registry = persistent_registry.clone();
let memory_ledger = memory_ledger.clone();
let st_principal = ctx.principal.clone();
let st_capsule_registry = ctx.capsule_registry.clone();
let st_allowance_store = ctx.allowance_store.clone();
let st_identity_store = ctx.identity_store.clone();
let st_profile_cache = ctx.profile_cache.clone();
let make_state: Arc<dyn Fn() -> HostState + Send + Sync> = Arc::new(move || HostState {
wasi_ctx: build_wasi_ctx(),
resource_table: wasmtime::component::ResourceTable::new(),
store_meter: crate::memory_ledger::StoreMemoryMeter::new(
run_loop_mem_bytes,
st_principal.clone(),
memory_ledger.clone(),
),
principal: st_principal.clone(),
capsule_uuid,
caller_context: None,
interceptor_active: false,
invocation_kv: None,
capsule_log: capsule_log.clone(),
capsule_id: capsule_id_val.clone(),
workspace_root: workspace_root.clone(),
vfs: Arc::clone(&overlay_vfs) as Arc<dyn astrid_vfs::Vfs>,
vfs_root_handle: root_handle.clone(),
home: home_mount.clone(),
tmp: tmp_mount.clone(),
invocation_home: None,
invocation_tmp: None,
invocation_secret_store: None,
invocation_capsule_log: None,
invocation_profile: None,
profile_cache: st_profile_cache.clone(),
invocation_env_overlay: None,
overlay_vfs: Some(Arc::clone(&overlay_vfs)),
upper_dir: Some(Arc::clone(&upper_dir_arc)),
kv: kv.clone(),
event_bus: event_bus.clone(),
ipc_limiter: Arc::clone(&ipc_limiter),
config: wasm_config.clone(),
secret_env: secret_env_set.clone(),
ipc_publish_patterns: ipc_publish_v.clone(),
ipc_subscribe_patterns: ipc_subscribe_v.clone(),
cli_socket_listener: cli_listener.clone(),
active_http_streams: std::collections::HashMap::new(),
next_http_stream_id: 1,
security: Some(
Arc::clone(&security_gate) as Arc<dyn crate::security::CapsuleSecurityGate>
),
hook_manager: None, capsule_registry: st_capsule_registry.clone(),
runtime_handle: tokio::runtime::Handle::current(),
has_uplink_capability: has_uplink,
capability_names: capability_names.clone(),
audit_firehose,
inbound_tx: tx.clone(),
registered_uplinks: Vec::new(),
lifecycle_phase: None,
secret_store: secret_store.clone(),
ready_tx: None,
blocking_semaphore: blocking_semaphore.clone(),
io_semaphore: io_semaphore.clone(),
cancel_token: cancel_token_for_state.clone(),
session_token: session_tok.clone(),
interceptor_handles: Vec::new(),
allowance_store: st_allowance_store.clone(),
identity_store: st_identity_store.clone(),
process_tracker: process_tracker.clone(),
persistent_processes: persistent_registry.clone(),
net_stream_count: 0,
subscription_count: 0,
process_count_total: 0,
process_count_by_principal: std::collections::HashMap::new(),
recv_yielded: false,
no_yield_windows: 0,
});
let pool_epoch_deadline = if run_budget.exempt {
u64::MAX
} else {
WASM_CAPSULE_TIMEOUT_SECS * 1000 / EPOCH_TICK_INTERVAL.as_millis() as u64
};
let wt_engine = build_wasmtime_engine()?;
let mut linker: Linker<HostState> = Linker::new(&wt_engine);
configure_kernel_linker(&mut linker).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to add Astrid host to linker: {e}"
))
})?;
let wasm_component = Component::from_binary(&wt_engine, &wasm_bytes).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to compile WASM component: {e}"
))
})?;
let instance_pre = linker.instantiate_pre(&wasm_component).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to pre-instantiate WASM component: {e}"
))
})?;
let is_single_store =
has_run_export || !manifest.capabilities.host_process.is_empty();
let (pool_max, pool_min_idle) = if is_single_store {
(1, 1)
} else {
(
self.runtime_limits.instance_pool_size,
self.runtime_limits.instance_pool_min_idle(),
)
};
let builder = pool::InstanceBuilder::new(
wt_engine.clone(),
instance_pre,
Arc::clone(&make_state),
pool_epoch_deadline,
INTERCEPTOR_FUEL_BUDGET,
);
let mut initial_instances: Vec<pool::PooledInstance> =
Vec::with_capacity(pool_min_idle);
for _ in 0..pool_min_idle {
initial_instances.push(builder.build().await?);
}
tracing::debug!(
capsule = %manifest.package.name,
pool_max,
pool_min_idle,
warm = initial_instances.len(),
has_run = has_run_export,
host_process = !manifest.capabilities.host_process.is_empty(),
"Instantiated capsule instance pool"
);
let has_run = has_run_export;
let mut pool_opt: Option<pool::CapsuleInstancePool> = None;
let mut store_arc: Option<Arc<AsyncMutex<Store<HostState>>>> = None;
let mut run_instance: Option<wasmtime::component::Instance> = None;
if has_run {
let mut pi = initial_instances
.pop()
.expect("min_idle >= 1, so the run-loop instance exists");
pi.store.set_fuel(u64::MAX).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to set run-loop fuel: {e}"))
})?;
if let Some(window_ticks) = run_budget.window_ticks {
pi.store.set_epoch_deadline(window_ticks);
pi.store.epoch_deadline_callback(move |mut store_ctx| {
let st = store_ctx.data_mut();
let (action, recv_yielded, no_yield_windows) = epoch_decision(
st.recv_yielded,
st.no_yield_windows,
window_ticks,
MAX_NO_YIELD_WINDOWS,
);
st.recv_yielded = recv_yielded;
st.no_yield_windows = no_yield_windows;
Ok(match action {
EpochAction::Yield(ticks) => wasmtime::UpdateDeadline::Yield(ticks),
EpochAction::Interrupt => wasmtime::UpdateDeadline::Interrupt,
})
});
} else {
pi.store.set_epoch_deadline(u64::MAX);
}
store_arc = Some(Arc::new(AsyncMutex::new(pi.store)));
run_instance = Some(pi.instance);
} else {
let reset_resources_on_return = manifest.capabilities.host_process.is_empty();
pool_opt = Some(pool::CapsuleInstancePool::new(
initial_instances,
pool_max,
pool_min_idle,
reset_resources_on_return,
builder,
&cancel_token,
));
}
let ready_rx = if has_run {
let (ready_tx, ready_rx) = tokio::sync::watch::channel(false);
let mut s = store_arc.as_ref().expect("run-loop has store").lock().await;
s.data_mut().ready_tx = Some(ready_tx);
Some(ready_rx)
} else {
None
};
let effective_interceptors = manifest.effective_interceptors();
if has_run && !effective_interceptors.is_empty() {
const MAX_AUTO_SUBSCRIBE: usize = 64;
if effective_interceptors.len() > MAX_AUTO_SUBSCRIBE {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"Capsule '{}' declares {} interceptors, exceeding the \
auto-subscribe limit ({MAX_AUTO_SUBSCRIBE})",
manifest.package.name,
effective_interceptors.len()
)));
}
for interceptor in &effective_interceptors {
if !crate::topic::has_valid_segments(&interceptor.event) {
return Err(CapsuleError::UnsupportedEntryPoint(format!(
"Interceptor event '{}' has invalid segment structure \
(empty segments, leading/trailing dots, or empty string)",
interceptor.event
)));
}
}
let mut s = store_arc.as_ref().expect("run-loop has store").lock().await;
let state = s.data_mut();
let count = effective_interceptors.len();
for (idx, interceptor) in effective_interceptors.into_iter().enumerate() {
state
.interceptor_handles
.push(host_state::InterceptorHandle {
handle_id: idx as u64,
action: interceptor.action,
topic: interceptor.event,
});
}
tracing::debug!(
capsule = %manifest.package.name,
count,
"Auto-subscribed interceptors for run-loop capsule"
);
}
Ok::<_, CapsuleError>((
pool_opt,
store_arc,
run_instance,
rx,
has_run,
ready_rx,
wt_engine,
))
}
.await?;
let capsule_id = crate::capsule::CapsuleId::new(&self.manifest.package.name)
.map_err(|e| CapsuleError::UnsupportedEntryPoint(e.to_string()))?;
if let Some(registry) = &ctx.capsule_registry {
registry
.write()
.await
.register_uuid(capsule_uuid, capsule_id.clone());
}
ctx.schema_catalog
.register_topics(&capsule_id, &self.manifest)
.await;
self.cancel_token = Some(cancel_token.clone());
self.wasmtime_engine = Some(wt_engine.clone());
self.epoch_ticker = Some(spawn_epoch_ticker(&wt_engine));
if !self.manifest.capabilities.host_process.is_empty() {
let bus = ctx.event_bus.clone();
let tracker = process_tracker_for_listener;
let ct = cancel_token.clone();
let capsule_name = self.manifest.package.name.clone();
tokio::task::spawn(async move {
let mut receiver = bus.subscribe_topic("tool.v1.request.cancel");
let handle = tokio::runtime::Handle::current();
loop {
tokio::select! {
biased;
() = ct.cancelled() => break,
event = receiver.recv() => {
match event.as_deref() {
Some(astrid_events::AstridEvent::Ipc { message, .. }) => {
if let astrid_events::ipc::IpcPayload::ToolCancelRequest { call_ids } = &message.payload {
tracing::info!(
capsule = %capsule_name,
?call_ids,
"Received tool cancel event, killing tracked processes"
);
tracker.cancel_by_call_ids(call_ids, &handle);
}
},
Some(_) => {}, None => break, }
}
}
}
});
let registry = persistent_registry_for_reaper;
let ct = cancel_token.clone();
tokio::task::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(2));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
() = ct.cancelled() => {
registry.shutdown();
break;
}
_ = tick.tick() => {
registry.reap_sweep();
}
}
}
});
}
if has_run {
self.ready_rx = ready_rx.map(tokio::sync::Mutex::new);
let capsule_name = self.manifest.package.name.clone();
let run_store = Arc::clone(store_arc.as_ref().expect("run-loop has store"));
let run_inst = run_instance.expect("run-loop has instance");
self.run_handle = Some(tokio::task::spawn(async move {
tracing::info!(capsule = %capsule_name, "Starting background WASM run loop");
let mut s = run_store.lock().await;
let typed = match run_inst.get_typed_func::<(), ()>(&mut *s, "run") {
Ok(f) => f,
Err(e) => {
tracing::error!(
capsule = %capsule_name,
error = %e,
"WASM background loop missing `run` export"
);
return;
},
};
if let Err(e) = typed.call_async(&mut *s, ()).await {
tracing::error!(
capsule = %capsule_name,
error = %e,
"WASM background loop failed"
);
}
}));
} else {
self.pool = pool_opt;
}
self.inbound_rx = rx;
self.profile_cache = ctx.profile_cache.clone();
self.overlay_registry = ctx.overlay_registry.clone();
self.owner_principal = Some(ctx.principal.clone());
self.group_config = ctx.group_config.clone();
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
info!(
capsule = %self.manifest.package.name,
"Unloading WASM component"
);
if let Some(token) = self.cancel_token.take() {
token.cancel();
}
if let Some(handle) = self.run_handle.take() {
handle.abort();
}
drop(self.epoch_ticker.take());
self.pool = None;
self.wasmtime_engine = None;
self.ready_rx = None; Ok(())
}
async fn wait_ready(&self, timeout: std::time::Duration) -> crate::capsule::ReadyStatus {
use crate::capsule::ReadyStatus;
let Some(rx_mutex) = &self.ready_rx else {
return ReadyStatus::Ready;
};
let mut rx = rx_mutex.lock().await.clone();
match tokio::time::timeout(timeout, rx.wait_for(|&v| v)).await {
Ok(Ok(_)) => ReadyStatus::Ready,
Ok(Err(_)) => ReadyStatus::Crashed, Err(_) => ReadyStatus::Timeout,
}
}
fn take_inbound_rx(
&mut self,
) -> Option<tokio::sync::mpsc::Receiver<astrid_core::InboundMessage>> {
self.inbound_rx.take()
}
async fn invoke_interceptor(
&self,
action: &str,
payload: &[u8],
caller: Option<&astrid_events::ipc::IpcMessage>,
) -> CapsuleResult<crate::capsule::InterceptResult> {
let pool = self.pool.as_ref().ok_or_else(|| {
CapsuleError::NotSupported(
"plugin handles interceptors internally via IPC auto-subscribe".into(),
)
})?;
let invoking_principal = caller
.and_then(|msg| msg.principal.as_deref())
.and_then(|p| astrid_core::PrincipalId::new(p).ok())
.or_else(|| self.owner_principal.clone())
.unwrap_or_default();
let invoke_start = std::time::Instant::now();
let invocation_profile: Option<Arc<astrid_core::profile::PrincipalProfile>> =
match self.profile_cache.as_ref() {
Some(cache) => {
let profile = cache.resolve(&invoking_principal).map_err(|e| {
tracing::error!(principal = %invoking_principal, error = %e,
"profile load failed; denying invocation (issue #666)");
CapsuleError::WasmError(format!(
"principal '{invoking_principal}' profile invalid: {e}"
))
})?;
check_principal_enabled(
&profile,
&invoking_principal,
self.manifest.package.name.as_str(),
action,
)?;
Some(profile)
},
None => None,
};
let now = std::time::Instant::now();
if let Some(reason) = cpu_rate_deny(
&self.fuel_rate,
invocation_profile.as_deref(),
self.group_config.as_deref(),
&invoking_principal,
now,
) {
tracing::warn!(
principal = %invoking_principal,
capsule = %self.manifest.package.name,
action,
"CPU-rate budget exceeded; denying invocation (per-principal throttle)"
);
return Ok(crate::capsule::InterceptResult::Deny { reason });
}
let is_daemon = !self.manifest.uplinks.is_empty() || self.manifest.capabilities.uplink;
if let Some(registry) = self.overlay_registry.as_ref() {
let invoking = caller
.and_then(|msg| msg.principal.as_deref())
.and_then(|p| astrid_core::PrincipalId::new(p).ok())
.or_else(|| self.owner_principal.clone())
.unwrap_or_default();
let resolved = registry.resolve(&invoking).await;
if let Err(e) = resolved {
tracing::error!(
principal = %invoking,
error = %e,
"overlay registry resolve failed; denying invocation (issue #668)"
);
return Err(CapsuleError::WasmError(format!(
"principal '{invoking}' overlay resolve failed: {e}"
)));
}
}
type HookTriggerResult = bindings::astrid::guest::lifecycle::CapsuleResult;
let checkout_start = std::time::Instant::now();
let mut checkout = pool.checkout().await.ok_or_else(|| {
CapsuleError::NotSupported("no capsule instance available".into())
})?;
let pool_wait_ms = checkout_start.elapsed().as_millis() as u64;
let typed_instance = checkout.instance();
let result: CapsuleResult<HookTriggerResult> = {
let s = checkout.store_mut();
let applied_profile: Arc<astrid_core::profile::PrincipalProfile> =
invocation_profile.clone().unwrap_or_else(|| {
Arc::new(astrid_core::profile::PrincipalProfile::default_ref().clone())
});
if !is_daemon {
let deadline = applied_profile.quotas.max_timeout_secs.saturating_mul(1000)
/ EPOCH_TICK_INTERVAL.as_millis() as u64;
s.set_epoch_deadline(deadline);
}
let _ = s.set_fuel(INTERCEPTOR_FUEL_BUDGET);
{
let state = s.data_mut();
state.caller_context = caller.cloned();
state.interceptor_active = true;
state.store_meter.set(
usize::try_from(applied_profile.quotas.max_memory_bytes).unwrap_or(usize::MAX),
invoking_principal.clone(),
);
state.invocation_profile = invocation_profile.clone();
let invocation_principal: Option<astrid_core::PrincipalId> = caller
.and_then(|msg| msg.principal.as_deref())
.and_then(|p| astrid_core::PrincipalId::new(p).ok())
.filter(|p| *p != state.principal);
state.invocation_kv = invocation_principal.as_ref().and_then(|p| {
let ns = format!("{}:capsule:{}", p, state.capsule_id);
match state.kv.with_namespace(&ns) {
Ok(kv) => Some(kv),
Err(e) => {
tracing::warn!(
principal = %p,
error = %e,
"Failed to create invocation KV scope"
);
None
},
}
});
if let Some(ref p) = invocation_principal {
let bundle = build_principal_vfs_bundle(p).await;
state.invocation_home = bundle.home;
state.invocation_tmp = bundle.tmp;
state.invocation_capsule_log =
open_capsule_log(p, state.capsule_id.as_str(), false);
state.invocation_env_overlay =
load_invocation_env_overlay(p, state.capsule_id.as_str());
state.invocation_secret_store = state.invocation_kv.as_ref().map(|kv| {
astrid_storage::build_secret_store(
&format!("{}:{}", state.capsule_id, p),
kv.clone(),
state.runtime_handle.clone(),
)
});
}
}
let typed_lookup = typed_instance
.get_typed_func::<(String, Vec<u8>), (HookTriggerResult,)>(
&mut *s,
"astrid-hook-trigger",
);
match typed_lookup {
Ok(func) => func
.call_async(&mut *s, (action.to_string(), payload.to_vec()))
.await
.map(|(cr,)| cr)
.map_err(|e| {
CapsuleError::WasmError(format!("astrid_hook_trigger failed: {e:?}"))
}),
Err(e) => Err(CapsuleError::UnsupportedEntryPoint(format!(
"capsule does not export `astrid-hook-trigger`: {e}"
))),
}
};
let fuel_after = checkout.store_mut().get_fuel().unwrap_or(0);
let fuel_used = INTERCEPTOR_FUEL_BUDGET.saturating_sub(fuel_after);
self.fuel_ledger.charge(&invoking_principal, fuel_used);
self.fuel_rate
.record(&invoking_principal, fuel_used, std::time::Instant::now());
drop(checkout);
let invoke_ms = invoke_start.elapsed().as_millis() as u64;
metrics::histogram!(
"astrid_capsule_invocation_duration_seconds",
"capsule" => self.manifest.package.name.clone(),
"action" => action.to_string(),
)
.record(invoke_start.elapsed().as_secs_f64());
tracing::debug!(
target: "astrid.sample",
capsule = %self.manifest.package.name,
action,
principal = %invoking_principal,
pool_wait_ms,
invoke_ms,
fuel_used,
ok = result.is_ok(),
"interceptor invocation"
);
result.map(|cr| {
crate::capsule::InterceptResult::from_capsule_result(&cr.action, cr.data.as_deref())
})
}
fn check_health(&self) -> crate::capsule::CapsuleState {
if let Some(handle) = &self.run_handle
&& handle.is_finished()
{
return crate::capsule::CapsuleState::Failed(
"WASM run loop exited unexpectedly".into(),
);
}
crate::capsule::CapsuleState::Ready
}
}
pub struct LifecycleConfig {
pub wasm_bytes: Vec<u8>,
pub capsule_id: crate::capsule::CapsuleId,
pub workspace_root: PathBuf,
pub home_root: Option<PathBuf>,
pub kv: astrid_storage::ScopedKvStore,
pub event_bus: astrid_events::EventBus,
pub config: std::collections::HashMap<String, serde_json::Value>,
pub secret_store: std::sync::Arc<dyn astrid_storage::secret::SecretStore>,
}
pub async fn run_lifecycle(
cfg: LifecycleConfig,
phase: LifecyclePhase,
previous_version: Option<&str>,
) -> CapsuleResult<()> {
let export_name = match phase {
LifecyclePhase::Install => "astrid-install",
LifecyclePhase::Upgrade => "astrid-upgrade",
};
let has_export = wasm_exports_contain(export_name, &cfg.wasm_bytes);
if !has_export {
tracing::debug!(
capsule = %cfg.capsule_id,
export = export_name,
"Capsule does not export lifecycle hook, skipping"
);
return Ok(());
}
let vfs = astrid_vfs::HostVfs::new();
let root_handle = astrid_capabilities::DirHandle::new();
vfs.register_dir(root_handle.clone(), cfg.workspace_root.clone())
.await
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to register VFS directory for lifecycle: {e}"
))
})?;
let home_mount: Option<PrincipalMount> = match cfg.home_root.as_ref() {
Some(h_root) => {
let canonical = h_root.canonicalize().unwrap_or_else(|_| h_root.clone());
mount_dir(&canonical).await
},
None => None,
};
let host_state = HostState {
wasi_ctx: build_wasi_ctx(),
store_meter: crate::memory_ledger::StoreMemoryMeter::new(
WASM_MAX_MEMORY_BYTES,
astrid_core::PrincipalId::default(),
crate::MemoryLedger::default(),
),
resource_table: wasmtime::component::ResourceTable::new(),
principal: astrid_core::PrincipalId::default(),
capsule_uuid: uuid::Uuid::new_v4(),
caller_context: None,
interceptor_active: false,
invocation_kv: None,
capsule_log: None,
capsule_id: cfg.capsule_id.clone(),
workspace_root: cfg.workspace_root,
vfs: Arc::new(vfs),
vfs_root_handle: root_handle,
home: home_mount,
tmp: None,
invocation_home: None,
invocation_tmp: None,
invocation_secret_store: None,
invocation_capsule_log: None,
invocation_profile: None,
profile_cache: None,
invocation_env_overlay: None,
overlay_vfs: None,
upper_dir: None,
kv: cfg.kv,
event_bus: cfg.event_bus,
ipc_limiter: Arc::new(astrid_events::ipc::IpcRateLimiter::new()),
config: cfg.config,
secret_env: std::collections::HashSet::new(),
ipc_publish_patterns: Vec::new(),
ipc_subscribe_patterns: Vec::new(),
security: None,
hook_manager: None,
capsule_registry: None,
runtime_handle: tokio::runtime::Handle::current(),
has_uplink_capability: false,
capability_names: Vec::new(),
audit_firehose: false,
inbound_tx: None,
registered_uplinks: Vec::new(),
cli_socket_listener: None,
active_http_streams: std::collections::HashMap::new(),
next_http_stream_id: 1,
lifecycle_phase: Some(phase),
secret_store: cfg.secret_store,
ready_tx: None,
blocking_semaphore: HostState::default_blocking_semaphore(),
io_semaphore: HostState::default_io_semaphore(),
cancel_token: tokio_util::sync::CancellationToken::new(),
session_token: None,
interceptor_handles: Vec::new(),
allowance_store: None,
identity_store: None,
process_tracker: Arc::new(host::process::ProcessTracker::new()),
persistent_processes: Arc::new(host::process::PersistentProcessRegistry::new(
tokio::runtime::Handle::current(),
)),
net_stream_count: 0,
subscription_count: 0,
process_count_total: 0,
process_count_by_principal: std::collections::HashMap::new(),
recv_yielded: false,
no_yield_windows: 0,
};
const LIFECYCLE_TIMEOUT_SECS: u64 = 10 * 60;
let wt_engine = build_wasmtime_engine()?;
let mut store = Store::new(&wt_engine, host_state);
let deadline_ticks = LIFECYCLE_TIMEOUT_SECS * 10; store.set_epoch_deadline(deadline_ticks);
store.set_fuel(u64::MAX).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!("Failed to set lifecycle fuel: {e}"))
})?;
let _epoch_guard = spawn_epoch_ticker(&wt_engine);
let mut linker: Linker<HostState> = Linker::new(&wt_engine);
configure_kernel_linker(&mut linker).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to add Astrid host to linker for lifecycle: {e}"
))
})?;
let wasm_component = Component::from_binary(&wt_engine, &cfg.wasm_bytes).map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to compile WASM component for lifecycle: {e}"
))
})?;
let instance = linker
.instantiate_async(&mut store, &wasm_component)
.await
.map_err(|e| {
CapsuleError::UnsupportedEntryPoint(format!(
"Failed to instantiate WASM component for lifecycle: {e}"
))
})?;
tracing::info!(
capsule = %cfg.capsule_id,
phase = ?phase,
previous_version = previous_version.unwrap_or("(none)"),
"Running lifecycle hook"
);
let func = instance
.get_typed_func::<(), ()>(&mut store, export_name)
.map_err(|_| {
CapsuleError::UnsupportedEntryPoint(format!(
"capsule does not export lifecycle hook `{export_name}`"
))
})?;
func.call_async(&mut store, ()).await.map_err(|e| {
CapsuleError::ExecutionFailed(format!("lifecycle hook {export_name} failed: {e}"))
})?;
let _ = phase;
tracing::info!(
capsule = %cfg.capsule_id,
phase = ?phase,
"Lifecycle hook completed successfully"
);
Ok(())
}
fn wasm_exports_contain_run(wasm_bytes: &[u8]) -> bool {
wasm_exports_contain("run", wasm_bytes)
}
const STUB_PRONE_EXPORTS: [&str; 3] = ["run", "astrid-install", "astrid-upgrade"];
fn wasm_exports_contain(name: &str, wasm_bytes: &[u8]) -> bool {
let trio_position = |export_name: &str| -> Option<usize> {
STUB_PRONE_EXPORTS.iter().position(|n| *n == export_name)
};
let resolve = |trio: &[Option<u32>; STUB_PRONE_EXPORTS.len()]| -> Option<bool> {
let pos = trio_position(name)?;
let target = trio[pos]?;
let aliased = trio
.iter()
.enumerate()
.any(|(i, idx)| i != pos && *idx == Some(target));
Some(!aliased)
};
for payload in wasmparser::Parser::new(0).parse_all(wasm_bytes) {
match payload {
Ok(wasmparser::Payload::ExportSection(reader)) => {
let mut trio: [Option<u32>; STUB_PRONE_EXPORTS.len()] =
[None; STUB_PRONE_EXPORTS.len()];
let mut name_present = false;
for export in reader {
let e = match export {
Ok(e) => e,
Err(e) => {
tracing::warn!("failed to parse WASM export entry: {e}");
return true; },
};
if e.kind != wasmparser::ExternalKind::Func {
continue;
}
if e.name == name {
name_present = true;
}
if let Some(pos) = trio_position(e.name) {
trio[pos] = Some(e.index);
}
}
if let Some(real) = resolve(&trio) {
return real;
}
if name_present {
return true;
}
},
Ok(wasmparser::Payload::ComponentExportSection(reader)) => {
let mut trio: [Option<u32>; STUB_PRONE_EXPORTS.len()] =
[None; STUB_PRONE_EXPORTS.len()];
let mut name_present = false;
for export in reader {
let e = match export {
Ok(e) => e,
Err(e) => {
tracing::warn!("failed to parse component export entry: {e}");
return true;
},
};
if e.kind != wasmparser::ComponentExternalKind::Func {
continue;
}
if e.name.0 == name {
name_present = true;
}
if let Some(pos) = trio_position(e.name.0) {
trio[pos] = Some(e.index);
}
}
if let Some(real) = resolve(&trio) {
return real;
}
if name_present {
return true;
}
},
Err(e) => {
tracing::warn!("failed to pre-scan WASM binary: {e}");
return true; },
_ => {},
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
fn pid(name: &str) -> astrid_core::PrincipalId {
astrid_core::PrincipalId::new(name).unwrap()
}
fn profile_with(
groups: &[&str],
grants: &[&str],
revokes: &[&str],
) -> astrid_core::profile::PrincipalProfile {
astrid_core::profile::PrincipalProfile {
groups: groups.iter().map(|s| (*s).to_string()).collect(),
grants: grants.iter().map(|s| (*s).to_string()).collect(),
revokes: revokes.iter().map(|s| (*s).to_string()).collect(),
..Default::default()
}
}
fn builtin_groups() -> astrid_core::GroupConfig {
astrid_core::GroupConfig::builtin_only()
}
#[test]
fn budget_admin_run_loop_is_exempt_via_capability() {
let p = profile_with(&["admin"], &[], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("default"), true);
assert!(b.exempt, "admin must be exempt via the `*` capability");
assert!(!b.bound_run_loop);
assert_eq!(b.window_ticks, None);
}
#[test]
fn budget_non_admin_run_loop_is_bounded() {
let p = profile_with(&["agent"], &[], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), true);
assert!(!b.exempt, "agent must NOT be exempt");
assert!(b.bound_run_loop);
assert_eq!(b.window_ticks, Some(DEFAULT_RUN_LOOP_WINDOW_TICKS));
assert_eq!(b.mem_bytes, WASM_MAX_MEMORY_BYTES);
}
#[test]
fn budget_capability_grant_exempts_non_admin() {
let p = profile_with(&["agent"], &[astrid_core::CAP_RESOURCES_UNBOUNDED], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), true);
assert!(b.exempt, "explicit grant of the capability must exempt");
assert!(!b.bound_run_loop);
}
#[test]
fn budget_net_bind_capability_exempts_non_admin() {
let p = profile_with(&["agent"], &[astrid_core::CAP_NET_BIND], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("cli"), true);
assert!(
b.exempt,
"granted net_bind capability must exempt the uplink"
);
assert!(!b.bound_run_loop);
}
#[test]
fn budget_uplink_capability_exempts_non_admin() {
let p = profile_with(&["agent"], &[astrid_core::CAP_UPLINK], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("uplink"), true);
assert!(b.exempt, "granted uplink capability must exempt the daemon");
assert!(!b.bound_run_loop);
}
#[test]
fn budget_manifest_declaration_without_grant_is_bounded() {
let p = profile_with(&["agent"], &[], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("self-declarer"), true);
assert!(
!b.exempt,
"a capsule cannot self-exempt by declaring net_bind/uplink in its manifest"
);
assert!(b.bound_run_loop);
}
#[test]
fn budget_revoke_overrides_admin_exemption() {
let p = profile_with(
&["admin"],
&[],
&[
astrid_core::CAP_RESOURCES_UNBOUNDED,
astrid_core::CAP_NET_BIND,
astrid_core::CAP_UPLINK,
],
);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), true);
assert!(
!b.exempt,
"revoking all exemption capabilities must override the admin `*` grant"
);
assert!(b.bound_run_loop);
}
#[test]
fn budget_missing_profile_is_fail_secure_bounded() {
let g = builtin_groups();
let b = resolve_run_loop_budget(None, Some(&g), &pid("ghost"), true);
assert!(!b.exempt, "an unidentifiable principal must NOT be exempt");
assert!(b.bound_run_loop);
assert_eq!(b.window_ticks, Some(DEFAULT_RUN_LOOP_WINDOW_TICKS));
assert_eq!(b.mem_bytes, WASM_MAX_MEMORY_BYTES);
}
#[test]
fn budget_missing_group_config_is_fail_secure_bounded() {
let p = profile_with(&["admin"], &[], &[]);
let b = resolve_run_loop_budget(Some(&p), None, &pid("alice"), true);
assert!(!b.exempt, "missing GroupConfig must fail-secure to bounded");
assert!(b.bound_run_loop);
}
#[test]
fn budget_non_run_loop_capsule_is_not_bounded() {
let p = profile_with(&["agent"], &[], &[]);
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), false);
assert!(!b.bound_run_loop);
assert_eq!(b.window_ticks, None);
assert_eq!(b.mem_bytes, WASM_MAX_MEMORY_BYTES);
}
#[test]
fn budget_uses_owner_memory_quota_for_bound_run_loop() {
let mut p = profile_with(&["agent"], &[], &[]);
p.quotas.max_memory_bytes = 32 * 1024 * 1024;
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), true);
assert!(b.bound_run_loop);
assert_eq!(b.mem_bytes, 32 * 1024 * 1024);
}
#[test]
fn budget_tighter_timeout_shrinks_window() {
let mut p = profile_with(&["agent"], &[], &[]);
p.quotas.max_timeout_secs = 2;
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), true);
assert_eq!(b.window_ticks, Some(20));
}
#[test]
fn budget_long_timeout_clamps_to_default_window() {
let mut p = profile_with(&["agent"], &[], &[]);
p.quotas.max_timeout_secs = 3600;
let g = builtin_groups();
let b = resolve_run_loop_budget(Some(&p), Some(&g), &pid("alice"), true);
assert_eq!(b.window_ticks, Some(DEFAULT_RUN_LOOP_WINDOW_TICKS));
}
#[test]
fn exemption_requires_both_profile_and_groups() {
let p = profile_with(&["admin"], &[], &[]);
let g = builtin_groups();
assert!(resolve_exemption(Some(&p), Some(&g), &pid("a")));
assert!(!resolve_exemption(None, Some(&g), &pid("a")));
assert!(!resolve_exemption(Some(&p), None, &pid("a")));
assert!(!resolve_exemption(None, None, &pid("a")));
}
#[test]
fn exemption_is_false_for_plain_agent() {
let p = profile_with(&["agent"], &[], &[]);
let g = builtin_groups();
assert!(!resolve_exemption(Some(&p), Some(&g), &pid("a")));
}
#[test]
fn audit_firehose_cap_literal_pinned() {
assert_eq!(AUDIT_FIREHOSE_CAP, "audit:read_all");
}
#[test]
fn audit_firehose_holder_true() {
let admin = profile_with(&["admin"], &[], &[]);
let g = builtin_groups();
assert!(resolve_audit_firehose(
Some(&admin),
Some(&g),
&pid("default")
));
let granted = profile_with(&["agent"], &[AUDIT_FIREHOSE_CAP], &[]);
assert!(resolve_audit_firehose(
Some(&granted),
Some(&g),
&pid("alice")
));
}
#[test]
fn audit_firehose_fail_secure_false() {
let g = builtin_groups();
let admin = profile_with(&["admin"], &[], &[]);
assert!(!resolve_audit_firehose(None, Some(&g), &pid("ghost")));
assert!(!resolve_audit_firehose(Some(&admin), None, &pid("default")));
let plain = profile_with(&["agent"], &[], &[]);
assert!(!resolve_audit_firehose(
Some(&plain),
Some(&g),
&pid("alice")
));
}
#[test]
fn audit_firehose_revoke_overrides_admin() {
let p = profile_with(&["admin"], &[], &[AUDIT_FIREHOSE_CAP]);
let g = builtin_groups();
assert!(
!resolve_audit_firehose(Some(&p), Some(&g), &pid("alice")),
"revoking audit:read_all must override the admin `*` grant"
);
}
#[test]
fn audit_firehose_ignores_manifest_by_construction() {
let g = builtin_groups();
let no_cap = profile_with(&["agent"], &[], &[]);
assert!(!resolve_audit_firehose(
Some(&no_cap),
Some(&g),
&pid("self-declarer")
));
let with_cap = profile_with(&["agent"], &[AUDIT_FIREHOSE_CAP], &[]);
assert!(resolve_audit_firehose(
Some(&with_cap),
Some(&g),
&pid("self-declarer")
));
}
const RATE_BUDGET: u64 = 1_000;
fn saturate(
rl: &crate::FuelRateLimiter,
p: &astrid_core::PrincipalId,
now: std::time::Instant,
) {
rl.record(p, RATE_BUDGET * 100, now);
}
fn budgeted_profile(
groups: &[&str],
grants: &[&str],
) -> astrid_core::profile::PrincipalProfile {
let mut p = profile_with(groups, grants, &[]);
p.quotas.max_cpu_fuel_per_sec = RATE_BUDGET;
p
}
#[test]
fn rate_gate_bounded_principal_is_denied_when_over_budget() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("alice");
let prof = budgeted_profile(&["agent"], &[]);
let g = builtin_groups();
saturate(&rl, &p, now);
let decision = cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, now);
assert!(
decision.is_some(),
"a bounded principal over budget must be denied"
);
assert!(
decision.unwrap().contains("alice"),
"the deny reason must name the principal"
);
}
#[test]
fn rate_gate_self_heals_after_window_rolls() {
let rl = crate::FuelRateLimiter::default();
let t0 = std::time::Instant::now();
let p = pid("alice");
let prof = budgeted_profile(&["agent"], &[]);
let g = builtin_groups();
saturate(&rl, &p, t0);
assert!(
cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, t0).is_some(),
"over budget at t0 -> denied"
);
let t1 = t0 + std::time::Duration::from_millis(1_001);
assert!(
cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, t1).is_none(),
"after the 1s window rolls -> admitted again (no permanent brick)"
);
}
#[test]
fn rate_gate_exempt_principal_not_denied_even_over_budget() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("uplink");
let prof = budgeted_profile(&["agent"], &[astrid_core::CAP_RESOURCES_UNBOUNDED]);
let g = builtin_groups();
saturate(&rl, &p, now);
assert!(
cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, now).is_none(),
"an exempt (unbounded) principal must never be CPU-rate denied"
);
}
#[test]
fn rate_gate_admin_with_group_config_is_never_gated() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("default");
let prof = budgeted_profile(&["admin"], &[]);
let g = builtin_groups();
saturate(&rl, &p, now);
assert!(
cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, now).is_none(),
"admin (`*`) with group_config must never be CPU-rate gated"
);
}
#[test]
fn rate_gate_missing_group_config_makes_admin_bounded() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("default");
let prof = budgeted_profile(&["admin"], &[]);
saturate(&rl, &p, now);
assert!(
cpu_rate_deny(&rl, Some(&prof), None, &p, now).is_some(),
"missing group_config must fail-secure: admin becomes bounded and is denied over budget"
);
}
#[test]
fn rate_gate_zero_budget_is_unlimited() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("alice");
let mut prof = profile_with(&["agent"], &[], &[]);
prof.quotas.max_cpu_fuel_per_sec = 0;
let g = builtin_groups();
saturate(&rl, &p, now);
assert!(
cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, now).is_none(),
"a zero (unlimited) budget must never deny, even when saturated"
);
}
#[test]
fn rate_gate_no_profile_uses_generous_default_budget() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("anon");
let g = builtin_groups();
rl.record(&p, 1_000, now);
assert!(
cpu_rate_deny(&rl, None, Some(&g), &p, now).is_none(),
"a principal under the default budget is admitted"
);
rl.record(&p, astrid_core::profile::DEFAULT_MAX_CPU_FUEL_PER_SEC, now);
assert!(
cpu_rate_deny(&rl, None, Some(&g), &p, now).is_some(),
"with no profile the generous DEFAULT budget is still enforced"
);
}
#[test]
fn rate_gate_deny_is_ok_deny_not_err() {
let rl = crate::FuelRateLimiter::default();
let now = std::time::Instant::now();
let p = pid("alice");
let prof = budgeted_profile(&["agent"], &[]);
let g = builtin_groups();
saturate(&rl, &p, now);
let reason = cpu_rate_deny(&rl, Some(&prof), Some(&g), &p, now)
.expect("a saturated bounded principal must be denied");
let result: CapsuleResult<crate::capsule::InterceptResult> =
Ok(crate::capsule::InterceptResult::Deny { reason });
match result {
Ok(crate::capsule::InterceptResult::Deny { reason }) => {
assert!(reason.contains("alice"), "deny reason names the principal");
},
Ok(other) => panic!("deny must be InterceptResult::Deny, got {other:?}"),
Err(e) => panic!(
"deny must be Ok(Deny), NEVER Err — an Err-deny is a silent \
enforcement bypass (dispatcher continues the chain on Err): {e}"
),
}
}
#[test]
fn epoch_recv_loop_never_traps_and_resets() {
let (action, recv, windows) = epoch_decision(true, 99, 50, MAX_NO_YIELD_WINDOWS);
assert_eq!(action, EpochAction::Yield(50));
assert!(!recv, "flag must be cleared after reading");
assert_eq!(windows, 0, "a recv resets the no-yield counter");
}
#[test]
fn epoch_no_recv_yields_during_grace_then_interrupts() {
let max = 3u32;
let (a0, _, w0) = epoch_decision(false, 0, 50, max);
assert_eq!(a0, EpochAction::Yield(50));
assert_eq!(w0, 1);
let (a1, _, w1) = epoch_decision(false, w0, 50, max);
assert_eq!(a1, EpochAction::Yield(50));
assert_eq!(w1, 2);
let (a2, _, w2) = epoch_decision(false, w1, 50, max);
assert_eq!(a2, EpochAction::Interrupt);
assert_eq!(w2, 3);
}
#[test]
fn epoch_recv_every_window_never_interrupts_driven() {
let max = MAX_NO_YIELD_WINDOWS;
let mut no_yield = 0u32;
for window in 0..(max as u64 * 100 + 7) {
let recv_yielded = true;
let (action, new_recv, new_windows) = epoch_decision(recv_yielded, no_yield, 50, max);
assert_eq!(
action,
EpochAction::Yield(50),
"a recv-yielding loop must Yield on window {window}, never Interrupt"
);
assert!(!new_recv, "the flag is always cleared after reading");
assert_eq!(new_windows, 0, "every recv resets the no-yield counter");
no_yield = new_windows;
}
}
#[test]
fn epoch_single_late_recv_restores_full_grace_driven() {
let max = MAX_NO_YIELD_WINDOWS;
assert!(max >= 2, "test assumes a multi-window grace");
let mut no_yield = 0u32;
for _ in 0..(max - 1) {
let (action, _, w) = epoch_decision(false, no_yield, 50, max);
assert_eq!(action, EpochAction::Yield(50));
no_yield = w;
}
assert_eq!(no_yield, max - 1, "primed one window short of the trap");
let (action, _, w) = epoch_decision(true, no_yield, 50, max);
assert_eq!(action, EpochAction::Yield(50));
assert_eq!(w, 0, "one recv at the brink restores the full grace");
no_yield = w;
for window in 0..(max - 1) {
let (action, _, w) = epoch_decision(false, no_yield, 50, max);
assert_eq!(
action,
EpochAction::Yield(50),
"post-reset grace window {window} must Yield, not trap early"
);
no_yield = w;
}
let (action, _, _) = epoch_decision(false, no_yield, 50, max);
assert_eq!(
action,
EpochAction::Interrupt,
"trap lands on the full max-th post-reset window, not earlier"
);
}
#[test]
fn epoch_interrupt_is_immediate_when_max_is_one() {
let (action, _, windows) = epoch_decision(false, 0, 10, 1);
assert_eq!(action, EpochAction::Interrupt);
assert_eq!(windows, 1);
}
#[test]
fn epoch_counter_does_not_overflow() {
let (action, _, windows) = epoch_decision(false, u32::MAX, 10, MAX_NO_YIELD_WINDOWS);
assert_eq!(action, EpochAction::Interrupt);
assert_eq!(windows, u32::MAX);
}
#[test]
fn check_principal_enabled_allows_enabled_profile() {
let profile = astrid_core::profile::PrincipalProfile::default();
assert!(profile.enabled, "default profile must be enabled");
check_principal_enabled(&profile, &pid("alice"), "test-capsule", "do-thing")
.expect("enabled profile must pass the gate");
}
#[test]
fn check_principal_enabled_rejects_disabled_profile() {
let profile = astrid_core::profile::PrincipalProfile {
enabled: false,
..Default::default()
};
let err = check_principal_enabled(&profile, &pid("bob"), "test-capsule", "do-thing")
.expect_err("disabled profile must be denied");
let msg = err.to_string();
assert!(
msg.contains("disabled") && msg.contains("bob"),
"expected error to name principal and reason: {msg}"
);
}
#[test]
fn check_principal_enabled_denies_even_for_admin_group() {
let profile = astrid_core::profile::PrincipalProfile {
groups: vec!["admin".to_string()],
enabled: false,
..Default::default()
};
assert!(check_principal_enabled(&profile, &pid("admin_user"), "x", "y").is_err());
}
#[tokio::test]
async fn clear_on_drop_clears_invocation_state_on_unwind() {
use crate::engine::wasm::host_state::HostState;
use crate::engine::wasm::test_fixtures::minimal_host_state;
fn clear(state: &mut HostState) {
state.caller_context = None;
state.interceptor_active = false;
state.invocation_kv = None;
state.invocation_home = None;
state.invocation_tmp = None;
state.invocation_secret_store = None;
state.invocation_capsule_log = None;
state.invocation_profile = None;
state.invocation_env_overlay = None;
}
let mut state = minimal_host_state(tokio::runtime::Handle::current());
state.interceptor_active = true;
state.caller_context = Some(astrid_events::ipc::IpcMessage::new(
"x",
astrid_events::ipc::IpcPayload::Custom {
data: serde_json::json!({}),
},
uuid::Uuid::nil(),
));
clear(&mut state);
assert!(state.caller_context.is_none());
assert!(!state.interceptor_active);
assert!(state.invocation_kv.is_none());
assert!(state.invocation_home.is_none());
assert!(state.invocation_tmp.is_none());
assert!(state.invocation_secret_store.is_none());
assert!(state.invocation_capsule_log.is_none());
assert!(state.invocation_profile.is_none());
assert!(state.invocation_env_overlay.is_none());
}
#[tokio::test]
async fn ipc_recv_future_drop_leaves_host_state_untouched() {
use crate::engine::wasm::test_fixtures::minimal_host_state;
let mut state = minimal_host_state(tokio::runtime::Handle::current());
let baseline_caller = astrid_events::ipc::IpcMessage::new(
"baseline",
astrid_events::ipc::IpcPayload::Custom {
data: serde_json::json!({}),
},
uuid::Uuid::nil(),
);
state.caller_context = Some(baseline_caller.clone());
let fut = async {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
unreachable!()
};
tokio::select! {
biased;
_ = tokio::time::sleep(std::time::Duration::from_millis(5)) => {},
_ = fut => unreachable!(),
}
assert_eq!(
state.caller_context.as_ref().map(|m| m.topic.clone()),
Some("baseline".to_string()),
"cancelled recv future must not overwrite caller_context"
);
}
#[test]
fn build_onboarding_field_text() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: Some("Enter owner address".into()),
description: Some("The wallet address".into()),
default: None,
enum_values: vec![],
placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("owner", &def);
assert_eq!(field.key, "owner");
assert_eq!(field.prompt, "Enter owner address");
assert_eq!(field.description.as_deref(), Some("The wallet address"));
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Text
);
assert!(field.default.is_none());
}
#[test]
fn build_onboarding_field_secret() {
let def = crate::manifest::EnvDef {
env_type: "secret".into(),
request: None,
description: None,
default: None,
enum_values: vec!["a".into()], placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("apiKey", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Secret
);
}
#[test]
fn build_onboarding_field_enum_with_default() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: Some("Select network".into()),
description: None,
default: Some(serde_json::json!("testnet")),
enum_values: vec!["testnet".into(), "mainnet".into()],
placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("network", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Enum(vec!["testnet".into(), "mainnet".into()])
);
assert_eq!(field.default.as_deref(), Some("testnet"));
}
#[test]
fn build_onboarding_field_fallback_prompt() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: None,
description: None,
default: None,
enum_values: vec![],
placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("someKey", &def);
assert_eq!(field.prompt, "Please enter value for someKey");
}
#[test]
fn build_onboarding_field_single_enum_degrades_to_text_with_autofill() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: None,
description: None,
default: None,
enum_values: vec!["only".into()],
placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("single", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Text,
"Single-choice enum should degrade to text"
);
assert_eq!(
field.default.as_deref(),
Some("only"),
"Single-choice enum should auto-fill the sole valid value"
);
}
#[test]
fn build_onboarding_field_array() {
let def = crate::manifest::EnvDef {
env_type: "array".into(),
request: Some("Enter relay URLs".into()),
description: Some("Nostr relay endpoints".into()),
default: None,
enum_values: vec![],
placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("relays", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Array
);
assert_eq!(field.prompt, "Enter relay URLs");
}
#[test]
fn build_onboarding_field_empty_enum_degrades_to_text() {
let def = crate::manifest::EnvDef {
env_type: "string".into(),
request: None,
description: None,
default: None,
enum_values: vec![],
placeholder: None,
scope: crate::manifest::EnvScope::default(),
};
let field = crate::engine::build_onboarding_field("empty", &def);
assert_eq!(
field.field_type,
astrid_events::ipc::OnboardingFieldType::Text,
"Empty enum should degrade to text"
);
}
async fn wait_ready_from_rx(
rx: &tokio::sync::Mutex<tokio::sync::watch::Receiver<bool>>,
timeout: std::time::Duration,
) -> crate::capsule::ReadyStatus {
use crate::capsule::ReadyStatus;
let mut rx = rx.lock().await.clone();
match tokio::time::timeout(timeout, rx.wait_for(|&v| v)).await {
Ok(Ok(_)) => ReadyStatus::Ready,
Ok(Err(_)) => ReadyStatus::Crashed,
Err(_) => ReadyStatus::Timeout,
}
}
#[tokio::test]
async fn wait_ready_returns_ready_when_pre_signaled() {
let (tx, rx) = tokio::sync::watch::channel(false);
let _ = tx.send(true);
let rx_mutex = tokio::sync::Mutex::new(rx);
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(100)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Ready);
}
#[tokio::test]
async fn wait_ready_returns_timeout_when_never_signaled() {
let (_tx, rx) = tokio::sync::watch::channel(false);
let rx_mutex = tokio::sync::Mutex::new(rx);
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(10)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Timeout);
}
#[tokio::test]
async fn wait_ready_returns_crashed_when_sender_dropped() {
let (tx, rx) = tokio::sync::watch::channel(false);
drop(tx); let rx_mutex = tokio::sync::Mutex::new(rx);
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(100)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Crashed);
}
#[tokio::test]
async fn wait_ready_returns_ready_when_signaled_after_delay() {
let (tx, rx) = tokio::sync::watch::channel(false);
let rx_mutex = tokio::sync::Mutex::new(rx);
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = tx.send(true);
});
let status = wait_ready_from_rx(&rx_mutex, std::time::Duration::from_millis(500)).await;
assert_eq!(status, crate::capsule::ReadyStatus::Ready);
}
fn build_wasm_module(export_names: &[&str]) -> Vec<u8> {
use wasm_encoder::{
CodeSection, ExportKind, ExportSection, Function, FunctionSection, Module, TypeSection,
};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let mut functions = FunctionSection::new();
for _ in export_names {
functions.function(0);
}
module.section(&functions);
let mut exports = ExportSection::new();
for (i, name) in export_names.iter().enumerate() {
exports.export(name, ExportKind::Func, i as u32);
}
module.section(&exports);
let mut code = CodeSection::new();
for _ in export_names {
let mut f = Function::new(vec![]);
f.instruction(&wasm_encoder::Instruction::End);
code.function(&f);
}
module.section(&code);
module.finish()
}
#[test]
fn prescan_detects_run_export() {
let wasm = build_wasm_module(&["run"]);
assert!(wasm_exports_contain_run(&wasm), "should detect run export");
}
#[test]
fn prescan_returns_false_without_run() {
let wasm = build_wasm_module(&["tool_call", "install"]);
assert!(
!wasm_exports_contain_run(&wasm),
"should not detect run when absent"
);
}
#[test]
fn prescan_detects_run_among_multiple_exports() {
let wasm = build_wasm_module(&["install", "run", "tool_call"]);
assert!(
wasm_exports_contain_run(&wasm),
"should detect run among multiple exports"
);
}
#[test]
fn prescan_returns_false_for_empty_export_section() {
let wasm = build_wasm_module(&[]);
assert!(
!wasm_exports_contain_run(&wasm),
"empty export section should not have run"
);
}
#[test]
fn prescan_returns_false_for_module_with_no_export_section() {
use wasm_encoder::{Module, TypeSection};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let wasm = module.finish();
assert!(
!wasm_exports_contain_run(&wasm),
"module with no export section should not have run"
);
}
#[test]
fn prescan_returns_true_for_corrupt_binary() {
let garbage = b"not a wasm module at all";
assert!(
wasm_exports_contain_run(garbage),
"corrupt binary should default to true (safe: no timeout)"
);
}
fn build_wasm_module_with_aliases(exports: &[(&str, u32)]) -> Vec<u8> {
use wasm_encoder::{
CodeSection, ExportKind, ExportSection, Function, FunctionSection, Module, TypeSection,
};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let max_idx = exports.iter().map(|(_, i)| *i).max().unwrap_or(0);
let func_count = (max_idx + 1) as usize;
let mut functions = FunctionSection::new();
for _ in 0..func_count {
functions.function(0);
}
module.section(&functions);
let mut export_section = ExportSection::new();
for (name, idx) in exports {
export_section.export(name, ExportKind::Func, *idx);
}
module.section(&export_section);
let mut code = CodeSection::new();
for _ in 0..func_count {
let mut f = Function::new(vec![]);
f.instruction(&wasm_encoder::Instruction::End);
code.function(&f);
}
module.section(&code);
module.finish()
}
#[test]
fn prescan_rejects_run_aliased_with_install_and_upgrade() {
let wasm = build_wasm_module_with_aliases(&[
("astrid-hook-trigger", 0),
("run", 1),
("astrid-install", 1),
("astrid-upgrade", 1),
]);
assert!(
!wasm_exports_contain_run(&wasm),
"stub run aliased to install/upgrade must be treated as no run loop"
);
}
#[test]
fn prescan_accepts_run_distinct_from_install_stubs() {
let wasm = build_wasm_module_with_aliases(&[
("astrid-hook-trigger", 0),
("run", 1),
("astrid-install", 2),
("astrid-upgrade", 2),
]);
assert!(
wasm_exports_contain_run(&wasm),
"run distinct from aliased install/upgrade stubs is a real run loop"
);
}
#[test]
fn prescan_accepts_all_three_distinct_implementations() {
let wasm = build_wasm_module_with_aliases(&[
("astrid-hook-trigger", 0),
("run", 1),
("astrid-install", 2),
("astrid-upgrade", 3),
]);
assert!(wasm_exports_contain_run(&wasm));
assert!(wasm_exports_contain("astrid-install", &wasm));
assert!(wasm_exports_contain("astrid-upgrade", &wasm));
}
#[test]
fn prescan_distinguishes_real_install_from_run_upgrade_stubs() {
let wasm = build_wasm_module_with_aliases(&[
("astrid-hook-trigger", 0),
("run", 1),
("astrid-upgrade", 1),
("astrid-install", 2),
]);
assert!(
!wasm_exports_contain_run(&wasm),
"run aliased to upgrade is a stub even when install is real"
);
assert!(
wasm_exports_contain("astrid-install", &wasm),
"install with a unique index is real"
);
assert!(
!wasm_exports_contain("astrid-upgrade", &wasm),
"upgrade aliased to run is a stub"
);
}
#[test]
fn prescan_rejects_stubbed_lifecycle_exports() {
let wasm = build_wasm_module_with_aliases(&[
("astrid-hook-trigger", 0),
("run", 1),
("astrid-install", 1),
("astrid-upgrade", 1),
]);
assert!(!wasm_exports_contain("astrid-install", &wasm));
assert!(!wasm_exports_contain("astrid-upgrade", &wasm));
}
#[test]
fn prescan_non_trio_name_uses_plain_presence() {
let wasm = build_wasm_module_with_aliases(&[
("astrid-hook-trigger", 0),
("astrid-cron-trigger", 0),
]);
assert!(
wasm_exports_contain("astrid-hook-trigger", &wasm),
"non-trio names take face value even if shared"
);
assert!(wasm_exports_contain("astrid-cron-trigger", &wasm));
}
#[test]
fn prescan_ignores_non_func_run_export() {
use wasm_encoder::{
ExportKind, ExportSection, GlobalSection, GlobalType, Module, TypeSection, ValType,
};
let mut module = Module::new();
let mut types = TypeSection::new();
types.ty().function(vec![], vec![]);
module.section(&types);
let mut globals = GlobalSection::new();
globals.global(
GlobalType {
val_type: ValType::I32,
mutable: false,
shared: false,
},
&wasm_encoder::ConstExpr::i32_const(42),
);
module.section(&globals);
let mut exports = ExportSection::new();
exports.export("run", ExportKind::Global, 0);
module.section(&exports);
let wasm = module.finish();
assert!(
!wasm_exports_contain_run(&wasm),
"global named 'run' should not be detected as a function export"
);
}
async fn build_bundle_async_safe(ph: astrid_core::dirs::PrincipalHome) -> PrincipalVfsBundle {
build_principal_vfs_bundle_at(&ph).await
}
#[tokio::test(flavor = "multi_thread")]
async fn build_bundle_returns_empty_for_unregistered_principal() {
let tmp = tempfile::tempdir().unwrap();
let ph = astrid_core::dirs::PrincipalHome::from_path(tmp.path().join("home/mallory"));
let bundle = build_bundle_async_safe(ph).await;
assert!(bundle.home.is_none(), "unknown principal: no home mount");
assert!(bundle.tmp.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn build_bundle_populated_for_registered_principal() {
let tmp = tempfile::tempdir().unwrap();
let alice_root = tmp.path().join("home/alice");
let ph = astrid_core::dirs::PrincipalHome::from_path(&alice_root);
ph.ensure().unwrap();
let alice_canonical = alice_root.canonicalize().unwrap();
let bundle = build_bundle_async_safe(ph).await;
let home = bundle.home.as_ref().expect("home mount present");
assert_eq!(home.root, alice_canonical);
let tmp_mount = bundle.tmp.as_ref().expect("tmp mount present");
assert_eq!(tmp_mount.root, alice_canonical.join(".local").join("tmp"));
}
#[tokio::test(flavor = "multi_thread")]
async fn build_bundle_isolates_distinct_principals() {
let tmp = tempfile::tempdir().unwrap();
let alice_root = tmp.path().join("home/alice");
let bob_root = tmp.path().join("home/bob");
let alice_ph = astrid_core::dirs::PrincipalHome::from_path(&alice_root);
let bob_ph = astrid_core::dirs::PrincipalHome::from_path(&bob_root);
alice_ph.ensure().unwrap();
bob_ph.ensure().unwrap();
let alice_canonical = alice_root.canonicalize().unwrap();
let bob_canonical = bob_root.canonicalize().unwrap();
let alice_bundle = build_bundle_async_safe(alice_ph).await;
let bob_bundle = build_bundle_async_safe(bob_ph).await;
let alice_home = &alice_bundle.home.as_ref().unwrap().root;
let bob_home = &bob_bundle.home.as_ref().unwrap().root;
assert_ne!(
alice_home, bob_home,
"distinct principals, distinct home roots"
);
assert_eq!(alice_home, &alice_canonical);
assert_eq!(bob_home, &bob_canonical);
std::fs::write(alice_home.join("note.txt"), b"alice").unwrap();
std::fs::write(bob_home.join("note.txt"), b"bob").unwrap();
assert_eq!(
std::fs::read(alice_home.join("note.txt")).unwrap(),
b"alice"
);
assert_eq!(std::fs::read(bob_home.join("note.txt")).unwrap(), b"bob");
}
#[test]
fn open_capsule_log_returns_none_for_unregistered_principal() {
let tmp = tempfile::tempdir().unwrap();
let ph = astrid_core::dirs::PrincipalHome::from_path(tmp.path().join("home/mallory"));
assert!(open_capsule_log_at(&ph, "some-capsule", false).is_none());
assert!(open_capsule_log_at(&ph, "some-capsule", true).is_none());
assert!(
!ph.root().exists(),
"must not auto-mkdir an unregistered principal's home"
);
}
#[test]
fn open_capsule_log_opens_file_under_principal_tree() {
let tmp = tempfile::tempdir().unwrap();
let alice_root = tmp.path().join("home/alice");
let ph = astrid_core::dirs::PrincipalHome::from_path(&alice_root);
ph.ensure().unwrap();
let file = open_capsule_log_at(&ph, "my-capsule", false).expect("open ok");
let log_dir = ph.log_dir().join("my-capsule");
assert!(log_dir.is_dir(), "log dir auto-created under alice's tree");
let today = today_date_string();
let expected = log_dir.join(format!("{today}.log"));
assert!(
expected.is_file(),
"today's log file opened at {expected:?}"
);
use std::io::Write;
{
let mut f = file.lock().unwrap();
writeln!(f, "hello-alice").unwrap();
f.flush().unwrap();
}
let contents = std::fs::read_to_string(&expected).unwrap();
assert!(contents.contains("hello-alice"));
}
#[test]
fn open_capsule_log_isolates_distinct_principals() {
let tmp = tempfile::tempdir().unwrap();
let alice_root = tmp.path().join("home/alice");
let bob_root = tmp.path().join("home/bob");
let alice_ph = astrid_core::dirs::PrincipalHome::from_path(&alice_root);
let bob_ph = astrid_core::dirs::PrincipalHome::from_path(&bob_root);
alice_ph.ensure().unwrap();
bob_ph.ensure().unwrap();
let alice_log = open_capsule_log_at(&alice_ph, "shared-capsule", false).unwrap();
let bob_log = open_capsule_log_at(&bob_ph, "shared-capsule", false).unwrap();
use std::io::Write;
writeln!(alice_log.lock().unwrap(), "alice-line").unwrap();
writeln!(bob_log.lock().unwrap(), "bob-line").unwrap();
let today = today_date_string();
let alice_file = alice_ph
.log_dir()
.join("shared-capsule")
.join(format!("{today}.log"));
let bob_file = bob_ph
.log_dir()
.join("shared-capsule")
.join(format!("{today}.log"));
let alice_contents = std::fs::read_to_string(&alice_file).unwrap();
let bob_contents = std::fs::read_to_string(&bob_file).unwrap();
assert!(alice_contents.contains("alice-line"));
assert!(!alice_contents.contains("bob-line"));
assert!(bob_contents.contains("bob-line"));
assert!(!bob_contents.contains("alice-line"));
}
#[test]
fn open_capsule_log_with_prune_does_not_delete_todays_file() {
let tmp = tempfile::tempdir().unwrap();
let alice_root = tmp.path().join("home/alice");
let ph = astrid_core::dirs::PrincipalHome::from_path(&alice_root);
ph.ensure().unwrap();
let f1 = open_capsule_log_at(&ph, "c", true).unwrap();
use std::io::Write;
writeln!(f1.lock().unwrap(), "pre-prune line").unwrap();
f1.lock().unwrap().flush().unwrap();
drop(f1);
let f2 = open_capsule_log_at(&ph, "c", true).unwrap();
drop(f2);
let today = today_date_string();
let path = ph.log_dir().join("c").join(format!("{today}.log"));
let contents = std::fs::read_to_string(&path).unwrap();
assert!(contents.contains("pre-prune line"));
}
#[test]
fn civil_from_days_epoch() {
assert_eq!(civil_from_days(0), (1970, 1, 1));
}
#[test]
fn civil_from_days_known_dates() {
assert_eq!(civil_from_days(59), (1970, 3, 1)); assert_eq!(civil_from_days(365), (1971, 1, 1)); assert_eq!(civil_from_days(11_016), (2000, 2, 29)); assert_eq!(civil_from_days(20_564), (2026, 4, 21)); }
#[test]
fn today_date_string_matches_civil_from_days() {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let days = secs / 86400;
let (y, m, d) = civil_from_days(days as i64);
assert_eq!(today_date_string(), format!("{y:04}-{m:02}-{d:02}"));
}
}
#[cfg(test)]
mod epoch_integration_tests {
use super::{
EpochAction, INTERCEPTOR_FUEL_BUDGET, MAX_NO_YIELD_WINDOWS, build_wasmtime_engine,
epoch_decision, spawn_epoch_ticker,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use wasmtime::{Engine, Linker, Module, Store, StoreLimits, StoreLimitsBuilder, Trap};
struct RunLoopTestState {
recv_yielded: bool,
no_yield_windows: u32,
}
fn apply_epoch_bound(store: &mut Store<RunLoopTestState>, window_ticks: u64) {
store.set_fuel(u64::MAX).expect("fuel enabled");
store.set_epoch_deadline(window_ticks);
store.epoch_deadline_callback(move |mut cx| {
let st = cx.data_mut();
let (action, recv_yielded, no_yield_windows) = epoch_decision(
st.recv_yielded,
st.no_yield_windows,
window_ticks,
MAX_NO_YIELD_WINDOWS,
);
st.recv_yielded = recv_yielded;
st.no_yield_windows = no_yield_windows;
Ok(match action {
EpochAction::Yield(ticks) => wasmtime::UpdateDeadline::Yield(ticks),
EpochAction::Interrupt => wasmtime::UpdateDeadline::Interrupt,
})
});
}
fn assert_interrupt(err: &wasmtime::Error) {
let trap = err.root_cause().downcast_ref::<Trap>();
assert_eq!(
trap,
Some(&Trap::Interrupt),
"expected the epoch-interrupt trap (the CPU bound), got: {err:?}"
);
}
fn unit_module(engine: &Engine, wat: &str) -> Module {
Module::new(engine, wat).expect("valid wat module")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn pure_spin_guest_interrupt_trapped_via_production_callback() {
let engine = build_wasmtime_engine().expect("engine");
let module = unit_module(
&engine,
r#"(module (func (export "run") (loop $l (br $l))))"#,
);
let mut store = Store::new(
&engine,
RunLoopTestState {
recv_yielded: false,
no_yield_windows: 0,
},
);
apply_epoch_bound(&mut store, 1);
let linker = Linker::new(&engine);
let instance = linker
.instantiate_async(&mut store, &module)
.await
.expect("instantiate");
let run = instance
.get_typed_func::<(), ()>(&mut store, "run")
.expect("run export");
let ticker = spawn_epoch_ticker(&engine);
let res =
tokio::time::timeout(Duration::from_secs(10), run.call_async(&mut store, ())).await;
drop(ticker);
let outcome = res.expect("pure-spin guest must not starve the worker / hang");
let err = outcome.expect_err("pure-spin guest must TRAP, not run forever");
assert_interrupt(&err);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn no_recv_spinner_terminates_and_coexists() {
let engine = build_wasmtime_engine().expect("engine");
let module = unit_module(
&engine,
r#"(module (func (export "run") (loop $l (br $l))))"#,
);
let mut store = Store::new(
&engine,
RunLoopTestState {
recv_yielded: false,
no_yield_windows: 0,
},
);
apply_epoch_bound(&mut store, 1);
let linker = Linker::new(&engine);
let instance = linker
.instantiate_async(&mut store, &module)
.await
.expect("instantiate");
let run = instance
.get_typed_func::<(), ()>(&mut store, "run")
.expect("run export");
let ticker = spawn_epoch_ticker(&engine);
let progress = Arc::new(AtomicU64::new(0));
let p = progress.clone();
let probe = tokio::spawn(async move {
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(20)).await;
p.fetch_add(1, Ordering::Relaxed);
}
});
let spin = tokio::time::timeout(Duration::from_secs(10), run.call_async(&mut store, ()));
let outcome = spin
.await
.expect("no-recv spinner must not hang — its future must resolve");
let err = outcome.expect_err("no-recv spinner must be Interrupt-trapped");
assert_interrupt(&err);
let _ = tokio::time::timeout(Duration::from_secs(2), probe).await;
let ticks = progress.load(Ordering::Relaxed);
drop(ticker);
assert_eq!(
ticks, 10,
"the concurrent probe must complete — the spinner must not wedge the runtime (got {ticks}/10)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn recv_yielding_guest_survives_many_windows() {
let engine = build_wasmtime_engine().expect("engine");
let module = unit_module(
&engine,
r#"(module
(import "host" "recv" (func $recv))
(func (export "run")
(loop $l
(call $recv)
(drop (i32.add (i32.const 1) (i32.const 2)))
(br $l))))"#,
);
let mut store = Store::new(
&engine,
RunLoopTestState {
recv_yielded: false,
no_yield_windows: 0,
},
);
apply_epoch_bound(&mut store, 1);
let mut linker: Linker<RunLoopTestState> = Linker::new(&engine);
linker
.func_wrap(
"host",
"recv",
|mut caller: wasmtime::Caller<'_, RunLoopTestState>| {
caller.data_mut().recv_yielded = true;
},
)
.expect("wire recv import");
let instance = linker
.instantiate_async(&mut store, &module)
.await
.expect("instantiate");
let run = instance
.get_typed_func::<(), ()>(&mut store, "run")
.expect("run export");
let ticker = spawn_epoch_ticker(&engine);
let res =
tokio::time::timeout(Duration::from_millis(1500), run.call_async(&mut store, ())).await;
drop(ticker);
assert!(
res.is_err(),
"a recv-yielding guest must NEVER trap — it should still be running \
when the wall-clock budget elapses, but it returned: {res:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn memory_cap_enforced_at_instantiation() {
struct MemState {
limits: StoreLimits,
}
let engine = build_wasmtime_engine().expect("engine");
let cap = 64 * 1024;
let over = unit_module(&engine, r#"(module (memory (export "m") 3))"#);
let mut store = Store::new(
&engine,
MemState {
limits: StoreLimitsBuilder::new().memory_size(cap).build(),
},
);
store.limiter(|s| &mut s.limits);
store.set_fuel(INTERCEPTOR_FUEL_BUDGET).expect("fuel");
store.set_epoch_deadline(u64::MAX);
let linker = Linker::new(&engine);
let over_res = linker.instantiate_async(&mut store, &over).await;
assert!(
over_res.is_err(),
"initial memory above the cap MUST fail at instantiation"
);
let ok = unit_module(&engine, r#"(module (memory (export "m") 1))"#);
let mut store = Store::new(
&engine,
MemState {
limits: StoreLimitsBuilder::new().memory_size(cap).build(),
},
);
store.limiter(|s| &mut s.limits);
store.set_fuel(INTERCEPTOR_FUEL_BUDGET).expect("fuel");
store.set_epoch_deadline(u64::MAX);
linker
.instantiate_async(&mut store, &ok)
.await
.expect("a within-cap initial memory MUST instantiate");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fuel_delta_is_exact_and_deterministic() {
let engine = build_wasmtime_engine().expect("engine");
let module = unit_module(
&engine,
r#"(module
(func (export "count") (param i32) (result i32)
(local $i i32) (local $acc i32)
(block $done
(loop $l
(br_if $done (i32.ge_s (local.get $i) (local.get 0)))
(local.set $acc (i32.add (local.get $acc) (i32.const 1)))
(local.set $i (i32.add (local.get $i) (i32.const 1)))
(br $l)))
(local.get $acc)))"#,
);
async fn run_n(engine: &Engine, module: &Module, n: i32) -> (i32, u64) {
let mut store = Store::new(engine, ());
store.set_fuel(INTERCEPTOR_FUEL_BUDGET).expect("fuel");
store.set_epoch_deadline(u64::MAX);
let linker = Linker::new(engine);
let instance = linker
.instantiate_async(&mut store, module)
.await
.expect("instantiate");
let count = instance
.get_typed_func::<i32, i32>(&mut store, "count")
.expect("count export");
let out = count.call_async(&mut store, n).await.expect("call");
let after = store.get_fuel().expect("fuel enabled");
(out, INTERCEPTOR_FUEL_BUDGET.saturating_sub(after))
}
let (out_a, used_a1) = run_n(&engine, &module, 1000).await;
let (out_a2, used_a2) = run_n(&engine, &module, 1000).await;
let (_out_b, used_b) = run_n(&engine, &module, 2000).await;
assert_eq!(out_a, 1000, "guest must compute the loop result");
assert_eq!(out_a2, 1000);
assert_eq!(
used_a1, used_a2,
"fuel delta must be deterministic for identical guest work"
);
assert!(
used_b > used_a1,
"fuel delta must grow with work: used(2000)={used_b} \
must exceed used(1000)={used_a1}"
);
assert!(
used_a1 > 0 && used_a1 < INTERCEPTOR_FUEL_BUDGET,
"fuel delta must be a real, bounded count: {used_a1}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exempt_run_loop_is_unmetered() {
let engine = build_wasmtime_engine().expect("engine");
let module = unit_module(
&engine,
r#"(module
(func (export "count") (param i32) (result i32)
(local $i i32) (local $acc i32)
(block $done
(loop $l
(br_if $done (i32.ge_s (local.get $i) (local.get 0)))
(local.set $acc (i32.add (local.get $acc) (i32.const 1)))
(local.set $i (i32.add (local.get $i) (i32.const 1)))
(br $l)))
(local.get $acc)))"#,
);
let heavy: i32 = 5_000_000;
let mut store = Store::new(&engine, ());
store.set_fuel(u64::MAX).expect("fuel"); store.set_epoch_deadline(u64::MAX); let linker = Linker::new(&engine);
let instance = linker
.instantiate_async(&mut store, &module)
.await
.expect("instantiate");
let count = instance
.get_typed_func::<i32, i32>(&mut store, "count")
.expect("count export");
let ticker = spawn_epoch_ticker(&engine);
let out = count
.call_async(&mut store, heavy)
.await
.expect("an exempt (u64::MAX, no-callback) run-loop must NOT trap");
drop(ticker);
assert_eq!(out, heavy, "exempt guest must complete the full workload");
}
}