use std::sync::Arc;
use std::time::Instant;
use ahash::HashMap;
use borderless::__private::registers::*;
use borderless::common::{Introduction, Revocation, Symbols};
use borderless::contracts::{BlockCtx, TxCtx};
use borderless::events::Events;
use borderless::{events::CallAction, ContractId};
use borderless::{BlockIdentifier, BorderlessId};
use borderless_kv_store::backend::lmdb::Lmdb;
use borderless_kv_store::Db;
use parking_lot::Mutex;
use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module, Store};
use super::{
code_store::CodeStore,
vm::{self, ContractCommit, VmState},
};
use crate::db::{
action_log::ActionRecord,
logger::{self, print_log_line},
};
use crate::log_shim::*;
use crate::ACTION_TX_REL_SUB_DB;
use crate::{
error::{ErrorKind, Result},
CONTRACT_SUB_DB,
};
pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
pub struct Runtime<S = Lmdb>
where
S: Db,
{
linker: Linker<VmState<S>>,
store: Store<VmState<S>>,
engine: Engine,
contract_store: CodeStore<S>,
mutability_lock: MutLock,
}
impl<S: Db> Runtime<S> {
pub fn new(storage: &S, contract_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
let db_ptr = storage.create_sub_db(CONTRACT_SUB_DB)?;
let _ = storage.create_sub_db(ACTION_TX_REL_SUB_DB)?; let start = Instant::now();
let state = VmState::new(storage.clone(), db_ptr);
let mut config = Config::new();
config.cranelift_opt_level(wasmtime::OptLevel::Speed);
config.async_support(false);
let engine = Engine::new(&config)?;
let mut linker: Linker<VmState<S>> = Linker::new(&engine);
linker.func_wrap(
"env",
"print",
|caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
)?;
linker.func_wrap(
"env",
"read_register",
|caller: Caller<'_, VmState<S>>, register_id, ptr| {
vm::read_register(caller, register_id, ptr)
},
)?;
linker.func_wrap(
"env",
"register_len",
|caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
)?;
linker.func_wrap(
"env",
"write_register",
|caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
},
)?;
linker.func_wrap(
"env",
"storage_read",
|caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
vm::storage_read(caller, base_key, sub_key, register_id)
},
)?;
linker.func_wrap(
"env",
"storage_write",
|caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
},
)?;
linker.func_wrap(
"env",
"storage_remove",
|caller: Caller<'_, VmState<S>>, base_key, sub_key| {
vm::storage_remove(caller, base_key, sub_key)
},
)?;
linker.func_wrap(
"env",
"storage_has_key",
|caller: Caller<'_, VmState<S>>, base_key, sub_key| {
vm::storage_has_key(caller, base_key, sub_key)
},
)?;
linker.func_wrap(
"env",
"storage_cursor",
|caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
)?;
linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
vm::tic(caller)
})?;
linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
vm::toc(caller)
})?;
linker.func_wrap("env", "rand", vm::rand)?;
let store = Store::new(&engine, state);
info!("Initialized runtime in: {:?}", start.elapsed());
Ok(Self {
linker,
store,
engine,
contract_store,
mutability_lock: lock,
})
}
pub fn into_shared(self) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(self))
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%contract_id), err))]
pub fn instantiate_contract(
&mut self,
contract_id: ContractId,
module_bytes: &[u8],
) -> Result<()> {
let module = Module::new(&self.engine, module_bytes)?;
check_module(&self.engine, &module)?;
self.contract_store.insert_contract(contract_id, module)?;
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%block_id), err))]
pub fn set_block(&mut self, block_id: BlockIdentifier, block_timestamp: u64) -> Result<()> {
let ctx = BlockCtx {
block_id,
timestamp: block_timestamp,
};
self.store
.data_mut()
.set_register(REGISTER_BLOCK_CTX, ctx.to_bytes()?);
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
let bytes = executor_id.into_bytes().to_vec();
self.store.data_mut().set_register(REGISTER_EXECUTOR, bytes);
Ok(())
}
#[must_use = "You have to handle the output events of this function"]
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
pub fn process_transaction(
&mut self,
cid: &ContractId,
action: CallAction,
writer: &BorderlessId,
tx_ctx: TxCtx,
) -> Result<Option<Events>> {
let input = action.to_bytes()?;
let events = self.process_chain_tx(
*cid,
input,
*writer,
tx_ctx.to_bytes()?,
ContractCommit::Action { action, tx_ctx },
)?;
Ok(events)
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %introduction.id, %writer), err))]
pub fn process_introduction(
&mut self,
introduction: Introduction,
writer: &BorderlessId,
tx_ctx: TxCtx,
) -> Result<()> {
let cid = match introduction.id {
borderless::prelude::Id::Contract { contract_id } => contract_id,
borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
};
let initial_state = introduction.initial_state.to_string().into_bytes();
self.process_chain_tx(
cid,
initial_state,
*writer,
tx_ctx.to_bytes()?,
ContractCommit::Introduction {
introduction,
tx_ctx,
},
)?;
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %revocation.id, %writer), err))]
pub fn process_revocation(
&mut self,
revocation: Revocation,
writer: &BorderlessId,
tx_ctx: TxCtx,
) -> Result<()> {
let input = revocation.to_bytes()?;
let cid = match revocation.id {
borderless::prelude::Id::Contract { contract_id } => contract_id,
borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
};
self.process_chain_tx(
cid,
input,
*writer,
tx_ctx.to_bytes()?,
ContractCommit::Revocation { revocation, tx_ctx },
)?;
Ok(())
}
fn process_chain_tx(
&mut self,
cid: ContractId,
input: Vec<u8>,
writer: BorderlessId,
tx_ctx_bytes: Vec<u8>,
commit: ContractCommit,
) -> Result<Option<Events>> {
let instance = self
.contract_store
.get_contract(&cid, &self.engine, &mut self.store, &mut self.linker)?
.ok_or_else(|| ErrorKind::MissingContract { cid })?;
let mtx = self.mutability_lock.get_lock(&cid);
let _guard = mtx.lock();
let contract_method = match &commit {
ContractCommit::Action { .. } => "process_transaction",
ContractCommit::Introduction { .. } => "process_introduction",
ContractCommit::Revocation { .. } => "process_revocation",
};
self.store.data_mut().set_register(REGISTER_INPUT, input);
self.store
.data_mut()
.set_register(REGISTER_TX_CTX, tx_ctx_bytes);
self.store
.data_mut()
.set_register(REGISTER_WRITER, writer.into_bytes().into());
self.store.data_mut().begin_mutable_exec(cid)?;
match instance
.get_typed_func::<(), ()>(&mut self.store, contract_method)
.and_then(|func| func.call(&mut self.store, ()))
{
Ok(()) => self.store.data_mut().finish_mutable_exec(commit)?,
Err(e) => {
warn!("{contract_method} failed with error: {e}");
let logs = self.store.data_mut().finish_immutable_exec()?;
for l in logs {
print_log_line(l);
}
}
}
match self.store.data().get_register(REGISTER_OUTPUT) {
Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
None => Ok(None),
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
pub fn perform_dry_run(
&mut self,
cid: &ContractId,
action: &CallAction,
writer: &BorderlessId,
) -> Result<()> {
let input = action.to_bytes()?;
let tx_ctx = TxCtx::dummy().to_bytes()?;
let instance = self
.contract_store
.get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
.ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
self.store.data_mut().begin_immutable_exec(*cid)?;
self.store.data_mut().set_register(REGISTER_INPUT, input);
self.store.data_mut().set_register(REGISTER_TX_CTX, tx_ctx);
self.store
.data_mut()
.set_register(REGISTER_WRITER, writer.into_bytes().into());
if let Err(e) = instance
.get_typed_func::<(), ()>(&mut self.store, "process_transaction")
.and_then(|func| func.call(&mut self.store, ()))
{
warn!("dry-run of process_transaction failed with error: {e}");
}
self.store.data_mut().finish_immutable_exec()?;
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
pub fn http_get_state(&mut self, cid: &ContractId, path: String) -> Result<(u16, Vec<u8>)> {
let instance = self
.contract_store
.get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
.ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
self.store.data_mut().begin_immutable_exec(*cid)?;
self.store
.data_mut()
.set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
if let Err(e) = instance
.get_typed_func::<(), ()>(&mut self.store, "http_get_state")
.and_then(|func| func.call(&mut self.store, ()))
{
warn!("http_get_state failed with error: {e}");
}
let log = self.store.data_mut().finish_immutable_exec()?;
let status = self
.store
.data()
.get_register(REGISTER_OUTPUT_HTTP_STATUS)
.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
let status = u16::from_be_bytes(status.try_into().unwrap());
let result = self
.store
.data()
.get_register(REGISTER_OUTPUT_HTTP_RESULT)
.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
for l in log {
logger::print_log_line(l);
}
Ok((status, result))
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
pub fn http_post_action(
&mut self,
cid: &ContractId,
path: String,
payload: Vec<u8>,
writer: &BorderlessId,
) -> Result<std::result::Result<CallAction, (u16, String)>> {
let instance = self
.contract_store
.get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
.ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
self.store.data_mut().begin_immutable_exec(*cid)?;
self.store
.data_mut()
.set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
self.store
.data_mut()
.set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
self.store
.data_mut()
.set_register(REGISTER_WRITER, writer.into_bytes().into());
if let Err(e) = instance
.get_typed_func::<(), ()>(&mut self.store, "http_post_action")
.and_then(|func| func.call(&mut self.store, ()))
{
error!("http_post_action failed with error: {e}");
}
let log = self.store.data_mut().finish_immutable_exec()?;
let status = self
.store
.data()
.get_register(REGISTER_OUTPUT_HTTP_STATUS)
.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
let status = u16::from_be_bytes(status.try_into().unwrap());
let result = self
.store
.data()
.get_register(REGISTER_OUTPUT_HTTP_RESULT)
.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
for l in log {
logger::print_log_line(l);
}
if status == 200 {
let action = CallAction::from_bytes(&result)?;
Ok(Ok(action))
} else {
let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
register: "http-result",
expected_type: "string",
})?;
Ok(Err((status, error)))
}
}
pub fn get_symbols(&mut self, cid: &ContractId) -> Result<Option<Symbols>> {
let instance = self
.contract_store
.get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
.ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
self.store.data_mut().begin_immutable_exec(*cid)?;
if let Err(e) = instance
.get_typed_func::<(), ()>(&mut self.store, "get_symbols")
.and_then(|func| func.call(&mut self.store, ()))
{
error!("get_symbols failed with error: {e}");
}
self.store.data_mut().finish_immutable_exec()?;
let bytes = match self.store.data().get_register(REGISTER_OUTPUT) {
Some(b) => b,
None => return Ok(None),
};
let symbols = Symbols::from_bytes(&bytes)?;
Ok(Some(symbols))
}
pub fn read_action(&self, cid: &ContractId, idx: usize) -> Result<Option<ActionRecord>> {
self.store.data().read_action(cid, idx)
}
pub fn len_actions(&self, cid: &ContractId) -> Result<u64> {
self.store.data().len_actions(cid)
}
pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
self.contract_store.available_contracts()
}
}
type Lock = Arc<Mutex<()>>;
#[derive(Clone, Default)]
pub struct MutLock {
map: Arc<Mutex<HashMap<ContractId, Lock>>>,
}
impl MutLock {
pub fn get_lock(&self, cid: &ContractId) -> Lock {
let mut map = self.map.lock();
let lock = map.entry(*cid).or_default();
lock.clone()
}
}
fn check_module(engine: &Engine, module: &Module) -> Result<()> {
let functions = [
"process_transaction",
"process_introduction",
"process_revocation",
"http_get_state",
"http_post_action",
"get_symbols",
];
for func in functions {
let exp = module
.get_export(func)
.ok_or_else(|| ErrorKind::MissingExport { func })?;
if let ExternType::Func(func_type) = exp {
if !func_type.matches(&FuncType::new(engine, [], [])) {
return Err(ErrorKind::InvalidFuncType { func }.into());
}
} else {
return Err(ErrorKind::InvalidExport { func }.into());
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
const ALL_EXPORTS: &str = r#"
(module
;; Declare the function `placeholder`
(func $placeholder)
;; Export the functions so they can be called from outside the module
(export "process_transaction" (func $placeholder))
(export "process_introduction" (func $placeholder))
(export "process_revocation" (func $placeholder))
(export "http_get_state" (func $placeholder))
(export "http_post_action" (func $placeholder))
(export "get_symbols" (func $placeholder))
)
"#;
fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
let mut new_lines = Vec::new();
for line in original.lines() {
if !line.contains(pattern) {
new_lines.push(line);
}
}
new_lines.join("\n")
}
#[test]
fn missing_exports() {
let mut config = Config::new();
config.cranelift_opt_level(wasmtime::OptLevel::Speed);
config.async_support(false);
let engine = Engine::new(&config).unwrap();
let functions = [
"process_transaction",
"process_introduction",
"process_revocation",
"http_get_state",
"http_post_action",
"get_symbols",
];
for func in functions {
let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
let module = Module::new(&engine, &wat_missing);
assert!(module.is_ok());
let err = check_module(&engine, &module.unwrap());
assert!(err.is_err());
}
let module = Module::new(&engine, &ALL_EXPORTS);
assert!(module.is_ok());
let err = check_module(&engine, &module.unwrap());
assert!(err.is_ok());
}
}