use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use bytes::Bytes;
use wasmtime::{Config, Engine, Module, Store};
use super::capability_linker::{ObserverCapability, ObserverCapabilityLinker};
use super::{ObserverCapToken, ObserverContext, ObserverError, ObserverHost};
pub const DEFAULT_OBSERVER_FUEL_BUDGET_V0_12: u64 = 100_000_000;
#[derive(Debug, Clone)]
pub struct WasmtimeObserverEngineConfig {
pub fuel_metering: bool,
pub fuel_budget: u64,
}
impl WasmtimeObserverEngineConfig {
pub fn deterministic_replay() -> Self {
Self {
fuel_metering: true,
fuel_budget: DEFAULT_OBSERVER_FUEL_BUDGET_V0_12,
}
}
pub fn with_fuel_budget(mut self, fuel_budget: u64) -> Self {
self.fuel_budget = fuel_budget;
self
}
pub fn to_config(&self) -> Config {
crate::wasm_runtime_common::config_for_profile(
&crate::wasm_runtime_common::EngineProfile::ChainNonAffecting {
fuel_budget: self.fuel_budget,
},
)
}
}
impl Default for WasmtimeObserverEngineConfig {
fn default() -> Self {
Self::deterministic_replay()
}
}
#[derive(Debug)]
pub struct WasmtimeObserverHost {
engine: Engine,
linker: ObserverCapabilityLinker,
registered_module: Option<Module>,
fuel_budget: u64,
trap_count: AtomicU64,
}
impl WasmtimeObserverHost {
pub fn with_deterministic_replay_config() -> Result<Self, ObserverHostError> {
Self::with_config(&WasmtimeObserverEngineConfig::deterministic_replay(), &[])
}
pub fn with_config(
config: &WasmtimeObserverEngineConfig,
capabilities: &[Arc<dyn ObserverCapability>],
) -> Result<Self, ObserverHostError> {
let (engine, fuel_budget) = crate::wasm_runtime_common::build_engine(
&crate::wasm_runtime_common::EngineProfile::ChainNonAffecting {
fuel_budget: config.fuel_budget,
},
)
.map_err(|e| ObserverHostError::EngineInitFailed {
reason: format!("{e}"),
})?;
let linker = ObserverCapabilityLinker::deny_by_default(&engine, capabilities)?;
Ok(Self {
engine,
linker,
registered_module: None,
fuel_budget,
trap_count: AtomicU64::new(0),
})
}
pub fn register_module(
&mut self,
bytes: Bytes,
expected_digest: blake3::Hash,
) -> Result<(), ObserverHostError> {
use super::capability_linker::{
ALLOWED_IMPORT_MODULE_PREFIXES, DENIED_IMPORT_MODULE_PREFIXES,
};
let module = crate::wasm_runtime_common::register_module_common(
&self.engine,
&bytes,
expected_digest,
ALLOWED_IMPORT_MODULE_PREFIXES,
DENIED_IMPORT_MODULE_PREFIXES,
"only `arkhe:observer/*` permitted",
)?;
self.registered_module = Some(module);
Ok(())
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn capability_linker(&self) -> &ObserverCapabilityLinker {
&self.linker
}
pub fn has_registered_module(&self) -> bool {
self.registered_module.is_some()
}
pub fn fuel_budget(&self) -> u64 {
self.fuel_budget
}
pub fn trap_count(&self) -> u64 {
self.trap_count.load(Ordering::Relaxed)
}
}
impl ObserverHost for WasmtimeObserverHost {
fn invoke(&self, ctx: &mut ObserverContext<'_>) -> Result<(), ObserverError> {
let result = match self.registered_module.as_ref() {
None => Ok(()),
Some(module) => self.run_wasm_invoke(module, ctx),
};
if result.is_err() {
self.trap_count.fetch_add(1, Ordering::Relaxed);
}
result
}
}
const _OBSERVER_CONTEXT_SHAPE_CHECK: () = {
if core::mem::size_of::<ObserverContext<'static>>()
!= core::mem::size_of::<&[ObserverCapToken]>()
{
panic!(
"ObserverContext gained a field — review chain-non-affecting \
clause 2 invariant before continuing"
);
}
};
impl WasmtimeObserverHost {
fn run_wasm_invoke(
&self,
module: &Module,
ctx: &mut ObserverContext<'_>,
) -> Result<(), ObserverError> {
let store_data = super::capability_linker::ObserverStoreData::with_capabilities(
ctx.capabilities.iter().copied(),
)
.with_initial_fuel(self.fuel_budget);
let mut store = Store::new(&self.engine, store_data);
store
.set_fuel(self.fuel_budget)
.map_err(|_| ObserverError::Trapped("fuel seed failed at invoke entry"))?;
let inst = self
.linker
.linker()
.instantiate(&mut store, module)
.map_err(|_| ObserverError::Trapped("observer module instantiation failed"))?;
let entry = inst
.get_typed_func::<(), ()>(&mut store, "observer")
.map_err(|_| {
ObserverError::Trapped(
"observer module missing `observer` export (signature `() -> ()`)",
)
})?;
match entry.call(&mut store, ()) {
Ok(()) => Ok(()),
Err(e) => {
let s = format!("{e:?}");
if s.contains("all fuel consumed") || s.contains("OutOfFuel") {
Err(ObserverError::BudgetExceeded)
} else if s.contains("called without PgWrite capability") {
Err(ObserverError::CapabilityDenied(ObserverCapToken::PgWrite))
} else {
Err(ObserverError::Trapped(
"observer trapped during wasm execution",
))
}
}
}
}
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum ObserverHostError {
#[error("wasmtime engine initialisation failed: {reason}")]
EngineInitFailed {
reason: String,
},
#[error("wasmtime linker setup failed: {reason}")]
LinkerSetupFailed {
reason: String,
},
#[error("observer module parse failed: {reason}")]
ModuleParseFailed {
reason: String,
},
#[error("observer module import rejected: {name} — {reason}")]
ImportRejected {
name: String,
reason: String,
},
#[error("observer module digest mismatch — expected {expected:?}, actual {actual:?}")]
DigestMismatch {
expected: blake3::Hash,
actual: blake3::Hash,
},
}
impl From<crate::wasm_runtime_common::RegistrationError> for ObserverHostError {
fn from(e: crate::wasm_runtime_common::RegistrationError) -> Self {
match e {
crate::wasm_runtime_common::RegistrationError::DigestMismatch { expected, actual } => {
ObserverHostError::DigestMismatch { expected, actual }
}
crate::wasm_runtime_common::RegistrationError::ParseFailed { reason } => {
ObserverHostError::ModuleParseFailed { reason }
}
crate::wasm_runtime_common::RegistrationError::ImportRejected { name, reason } => {
ObserverHostError::ImportRejected { name, reason }
}
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use crate::observer_host::ObserverCapToken;
#[test]
fn deterministic_replay_config_pins_fuel_metering_and_budget() {
let cfg = WasmtimeObserverEngineConfig::deterministic_replay();
assert!(cfg.fuel_metering);
assert_eq!(cfg.fuel_budget, DEFAULT_OBSERVER_FUEL_BUDGET_V0_12);
}
const _ASSERT_FUEL_BUDGET_LOWER: () = assert!(DEFAULT_OBSERVER_FUEL_BUDGET_V0_12 >= 1_000_000);
const _ASSERT_FUEL_BUDGET_UPPER: () =
assert!(DEFAULT_OBSERVER_FUEL_BUDGET_V0_12 <= 1_000_000_000);
#[test]
fn with_fuel_budget_override_works() {
let cfg = WasmtimeObserverEngineConfig::deterministic_replay().with_fuel_budget(50_000_000);
assert_eq!(cfg.fuel_budget, 50_000_000);
assert!(cfg.fuel_metering);
}
#[test]
fn host_records_fuel_budget_at_construction() {
let host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
assert_eq!(host.fuel_budget(), DEFAULT_OBSERVER_FUEL_BUDGET_V0_12);
let host2 = WasmtimeObserverHost::with_config(
&WasmtimeObserverEngineConfig::deterministic_replay().with_fuel_budget(7_777),
&[],
)
.unwrap();
assert_eq!(host2.fuel_budget(), 7_777);
}
#[test]
fn engine_builds_with_deterministic_replay_config() {
let host = WasmtimeObserverHost::with_deterministic_replay_config()
.expect("engine init must succeed under default config");
let _engine = host.engine();
}
#[test]
fn empty_host_pass_through_ok() {
let host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let caps = [ObserverCapToken::PgWrite];
let mut ctx = ObserverContext {
capabilities: &caps,
};
assert!(host.invoke(&mut ctx).is_ok());
}
#[test]
fn trap_count_starts_at_zero() {
let host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
assert_eq!(host.trap_count(), 0);
}
#[test]
fn trap_count_does_not_increment_on_pass_through_invoke() {
let host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let caps: [ObserverCapToken; 0] = [];
let mut ctx = ObserverContext {
capabilities: &caps,
};
for _ in 0..3 {
assert!(host.invoke(&mut ctx).is_ok());
}
assert_eq!(host.trap_count(), 0);
}
#[test]
fn observer_host_error_display_does_not_panic() {
let e = ObserverHostError::EngineInitFailed {
reason: "test reason".into(),
};
assert!(format!("{e}").contains("test reason"));
}
fn digest(bytes: &[u8]) -> blake3::Hash {
blake3::hash(bytes)
}
fn wat_to_bytes(wat: &str) -> Bytes {
Bytes::from(wat::parse_str(wat).expect("valid wat"))
}
#[test]
fn register_module_accepts_zero_import_preamble() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let preamble = Bytes::from_static(&[
0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00, ]);
let d = digest(preamble.as_ref());
host.register_module(preamble, d)
.expect("zero-import preamble passes digest + pre-scan");
assert!(host.has_registered_module());
}
#[test]
fn register_module_accepts_arkhe_observer_pg_write_import() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let bytes = wat_to_bytes(
r#"(module
(import "arkhe:observer/pg" "write"
(func (param i32 i32))))"#,
);
let d = digest(bytes.as_ref());
host.register_module(bytes, d)
.expect("allowed observer import passes registration");
assert!(host.has_registered_module());
}
#[test]
fn register_module_rejects_digest_mismatch() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let preamble = Bytes::from_static(&[0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00]);
let wrong_digest: blake3::Hash = [0xFFu8; 32].into();
let err = host
.register_module(preamble.clone(), wrong_digest)
.expect_err("wrong digest must reject");
match err {
ObserverHostError::DigestMismatch { expected, actual } => {
assert_eq!(expected, wrong_digest);
assert_eq!(actual, digest(preamble.as_ref()));
}
other => panic!("expected DigestMismatch, got {other:?}"),
}
assert!(!host.has_registered_module());
}
#[test]
fn register_module_rejects_wasi_random() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let bytes = wat_to_bytes(
r#"(module
(import "wasi:random/random" "get-random-u64"
(func (result i64))))"#,
);
let d = digest(bytes.as_ref());
let err = host
.register_module(bytes, d)
.expect_err("wasi:random must reject at registration");
assert!(matches!(err, ObserverHostError::ImportRejected { .. }));
assert!(!host.has_registered_module());
}
#[test]
fn register_module_rejects_arkhe_hook_in_observer_context() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let bytes = wat_to_bytes(
r#"(module
(import "arkhe:hook/state" "read"
(func (param i32 i32) (result i32))))"#,
);
let d = digest(bytes.as_ref());
let err = host
.register_module(bytes, d)
.expect_err("arkhe:hook/* must reject in observer context");
assert!(matches!(err, ObserverHostError::ImportRejected { .. }));
assert!(!host.has_registered_module());
}
#[test]
fn register_module_rejects_invalid_bytes() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let bytes = Bytes::from_static(&[0x00, 0x61, 0x73, 0x6d]); let d = digest(bytes.as_ref());
let err = host
.register_module(bytes, d)
.expect_err("invalid bytes must reject");
assert!(matches!(err, ObserverHostError::ModuleParseFailed { .. }));
}
#[test]
fn register_module_digest_check_runs_before_pre_scan() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let bytes = wat_to_bytes(
r#"(module
(import "wasi:random/random" "get-random-u64"
(func (result i64))))"#,
);
let wrong_digest: blake3::Hash = [0xAAu8; 32].into();
let err = host
.register_module(bytes, wrong_digest)
.expect_err("must reject");
assert!(matches!(err, ObserverHostError::DigestMismatch { .. }));
}
#[test]
fn capability_linker_accessor_returns_template() {
let host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let linker = host.capability_linker();
let dbg = format!("{linker:?}");
assert!(dbg.contains("arkhe:observer/"));
}
use crate::observer_host::capability_linker::{MockPgWriteCapability, PgWriteCapability};
#[test]
fn integration_observer_pg_write_records_bytes_chain_unaffected() {
let mock = Arc::new(MockPgWriteCapability::new());
let mock_handle: Arc<MockPgWriteCapability> = Arc::clone(&mock);
let cap: Arc<dyn ObserverCapability> = mock;
let mut host = WasmtimeObserverHost::with_config(
&WasmtimeObserverEngineConfig::deterministic_replay(),
&[cap],
)
.expect("host with PgWrite capability");
let bytes = wat_to_bytes(
r#"(module
(import "arkhe:observer/pg" "write"
(func $w (param i32 i32)))
(memory (export "memory") 1)
(data (i32.const 0) "ROW-PAYLOAD")
(func (export "observer")
i32.const 0
i32.const 11
call $w))"#,
);
let d = digest(bytes.as_ref());
host.register_module(bytes, d).expect("register module");
let caps = [ObserverCapToken::PgWrite];
let mut ctx = ObserverContext {
capabilities: &caps,
};
host.invoke(&mut ctx).expect("observer invocation");
let recorded = mock_handle.recorded();
assert_eq!(recorded.len(), 1, "exactly one pg.write invocation");
assert_eq!(recorded[0], b"ROW-PAYLOAD");
assert_eq!(host.trap_count(), 0, "no traps on success");
}
#[test]
fn integration_observer_pg_write_traps_without_capability() {
let mock = Arc::new(MockPgWriteCapability::new());
let mock_handle: Arc<MockPgWriteCapability> = Arc::clone(&mock);
let cap: Arc<dyn ObserverCapability> = mock;
let mut host = WasmtimeObserverHost::with_config(
&WasmtimeObserverEngineConfig::deterministic_replay(),
&[cap],
)
.unwrap();
let bytes = wat_to_bytes(
r#"(module
(import "arkhe:observer/pg" "write"
(func $w (param i32 i32)))
(memory (export "memory") 1)
(data (i32.const 0) "BLOCKED")
(func (export "observer")
i32.const 0
i32.const 7
call $w))"#,
);
let d = digest(bytes.as_ref());
host.register_module(bytes, d).unwrap();
let caps: [ObserverCapToken; 0] = [];
let mut ctx = ObserverContext {
capabilities: &caps,
};
match host.invoke(&mut ctx) {
Err(ObserverError::CapabilityDenied(ObserverCapToken::PgWrite)) => {}
other => panic!("expected CapabilityDenied(PgWrite), got {other:?}"),
}
assert_eq!(mock_handle.invocation_count(), 0, "mock untouched on deny");
assert_eq!(host.trap_count(), 1, "trap counter incremented");
}
#[test]
fn integration_observer_pg_write_traps_when_no_impl_registered() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let bytes = wat_to_bytes(
r#"(module
(import "arkhe:observer/pg" "write"
(func $w (param i32 i32)))
(memory (export "memory") 1)
(data (i32.const 0) "ORPHAN")
(func (export "observer")
i32.const 0
i32.const 6
call $w))"#,
);
let d = digest(bytes.as_ref());
host.register_module(bytes, d).unwrap();
let caps = [ObserverCapToken::PgWrite];
let mut ctx = ObserverContext {
capabilities: &caps,
};
match host.invoke(&mut ctx) {
Err(ObserverError::Trapped(_)) => {}
other => panic!("expected Trapped (no impl registered), got {other:?}"),
}
assert_eq!(host.trap_count(), 1);
}
#[test]
fn integration_observer_infinite_loop_returns_budget_exceeded() {
let mut host = WasmtimeObserverHost::with_config(
&WasmtimeObserverEngineConfig::deterministic_replay().with_fuel_budget(1_000),
&[],
)
.unwrap();
let bytes = wat_to_bytes(
r#"(module
(func (export "observer")
(loop $forever (br $forever))))"#,
);
let d = digest(bytes.as_ref());
host.register_module(bytes, d).unwrap();
let caps: [ObserverCapToken; 0] = [];
let mut ctx = ObserverContext {
capabilities: &caps,
};
match host.invoke(&mut ctx) {
Err(ObserverError::BudgetExceeded) => {}
other => panic!("expected BudgetExceeded, got {other:?}"),
}
assert_eq!(host.trap_count(), 1);
}
#[test]
fn integration_observer_module_without_observer_export_traps() {
let mut host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let preamble = Bytes::from_static(&[
0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00, ]);
let d = digest(preamble.as_ref());
host.register_module(preamble, d).unwrap();
let caps: [ObserverCapToken; 0] = [];
let mut ctx = ObserverContext {
capabilities: &caps,
};
match host.invoke(&mut ctx) {
Err(ObserverError::Trapped(msg)) => {
assert!(
msg.contains("missing `observer` export"),
"unexpected trap: {msg}"
);
}
other => panic!("expected Trapped(missing observer), got {other:?}"),
}
}
#[test]
fn integration_empty_host_pass_through() {
let host = WasmtimeObserverHost::with_deterministic_replay_config().unwrap();
let caps = [ObserverCapToken::PgWrite];
let mut ctx = ObserverContext {
capabilities: &caps,
};
assert!(host.invoke(&mut ctx).is_ok());
assert_eq!(host.trap_count(), 0);
}
#[test]
fn observer_context_shape_check_holds_at_compile_time() {
let _: () = _OBSERVER_CONTEXT_SHAPE_CHECK;
}
#[test]
fn integration_pg_write_capability_can_be_default_registered() {
let cap: Arc<dyn ObserverCapability> = Arc::new(PgWriteCapability::new());
let host = WasmtimeObserverHost::with_config(
&WasmtimeObserverEngineConfig::deterministic_replay(),
&[cap],
)
.expect("default PgWriteCapability registers");
assert_eq!(host.capability_linker().registered_capability_count(), 1);
}
}