use borderless::__private::registers::*;
use borderless::common::Id;
use borderless::contracts::BlockCtx;
use borderless::prelude::ledger::LedgerEntry;
use borderless::Context;
use borderless::{
__private::storage_keys::StorageKey,
common::{Introduction, Revocation},
contracts::TxCtx,
events::CallAction,
log::LogLine,
AgentId, ContractId,
};
use borderless_kv_store::*;
use nohash::IntMap;
use rand::Rng;
use std::{
cell::RefCell,
time::{Instant, SystemTime, UNIX_EPOCH},
};
use wasmtime::{Caller, Extern, Memory};
use crate::db::controller::Controller;
use crate::db::ledger::Ledger;
use crate::{
db::action_log::{ActionLog, ActionRecord},
db::controller::{write_introduction, write_revocation},
db::logger::Logger,
error::ErrorKind,
log_shim::*,
Error, Result,
};
use borderless::events::Topic;
#[cfg(feature = "agents")]
use tokio::sync::mpsc;
pub struct VmState<S: Db> {
registers: IntMap<u64, RefCell<Vec<u8>>>,
db: S,
db_ptr: S::Handle,
last_timer: Option<Instant>,
log_buffer: Vec<LogLine>,
active: ActiveEntity,
_async: Option<AsyncState>,
}
impl<S: Db> VmState<S> {
pub fn new(db: S, db_ptr: S::Handle) -> Self {
VmState {
registers: Default::default(),
db,
db_ptr,
last_timer: None,
log_buffer: Vec::new(),
active: ActiveEntity::None,
_async: None,
}
}
pub fn new_async(db: S, db_ptr: S::Handle) -> Self {
VmState {
registers: Default::default(),
db,
db_ptr,
last_timer: None,
log_buffer: Vec::new(),
active: ActiveEntity::None,
_async: Some(AsyncState::default()),
}
}
#[must_use = "You must handle the errors of this function"]
pub fn prepare_exec(&mut self, active_entity: ActiveEntity) -> Result<()> {
if self.active.is_some() {
return Err(Error::msg(
"Cannot start a new execution while something else is still active",
));
}
debug_assert!(self.registers.remove(®ISTER_OUTPUT).is_none());
debug_assert!(self
.registers
.remove(®ISTER_OUTPUT_HTTP_RESULT)
.is_none());
debug_assert!(self
.registers
.remove(®ISTER_OUTPUT_HTTP_STATUS)
.is_none());
debug_assert!(self.log_buffer.is_empty());
self.active = active_entity;
Ok(())
}
pub fn finish_exec(&mut self, commit: Option<Commit>) -> Result<Vec<LogLine>> {
let log_output = std::mem::take(&mut self.log_buffer);
let active = std::mem::replace(&mut self.active, ActiveEntity::None);
self.clear_cursor_registers()?;
self.registers.remove(®ISTER_OUTPUT);
self.registers.remove(®ISTER_OUTPUT_HTTP_RESULT);
self.registers.remove(®ISTER_OUTPUT_HTTP_STATUS);
let commit = match commit {
Some(c) => c,
None => return Ok(log_output),
};
let (id, db_txns, ledger_entries, tx_ctx) = match active {
ActiveEntity::Contract {
cid,
db_txns,
ledger_entries,
tx_ctx,
} => (Id::contract(cid), db_txns, ledger_entries, tx_ctx),
ActiveEntity::Agent { aid, db_txns } => (Id::agent(aid), db_txns, None, None),
ActiveEntity::None => return Ok(log_output),
};
let now = Instant::now();
let mut txn = self.db.begin_rw_txn()?;
let logger = Logger::new(&self.db, id);
logger.flush_lines(&log_output, &self.db_ptr, &mut txn)?;
for op in db_txns.unwrap_or_default() {
if !op.is_userspace() {
warn!("Tried to write or remove a value with a storage-key that is not in user-space, id={id}");
continue;
}
match op {
StorageOp::Write { key, value } => txn.write(&self.db_ptr, &key, &value)?,
StorageOp::Remove { key } => txn.delete(&self.db_ptr, &key)?,
}
}
let ledger = Ledger::new(&self.db);
for entry in ledger_entries.unwrap_or_default() {
ledger.commit_entry(
&mut txn,
&entry,
id.as_cid().expect("ledgers only exist in contracts"),
tx_ctx.as_ref().expect("ledgers are only modified by txs"),
)?;
}
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("timestamp < 1970")
.as_millis()
.try_into()
.expect("u64 should fit for 584942417 years");
match commit {
Commit::Action(action) => {
let cid = id.as_cid().expect("actions are only commited in contracts");
let tx_ctx = tx_ctx.expect("actions are only commited in contracts");
let action_log = ActionLog::new(&self.db, cid);
action_log.commit(&self.db_ptr, &mut txn, &action, tx_ctx)?;
}
Commit::Introduction(mut introduction) => {
assert_eq!(introduction.id, id);
introduction.meta.active_since = timestamp;
introduction.meta.tx_ctx_introduction = tx_ctx;
write_introduction::<S>(&self.db_ptr, &mut txn, introduction.clone())?;
Controller::new(&self.db)
.messages()
.init(&mut txn, introduction)?;
}
Commit::Revocation(revocation) => {
assert_eq!(revocation.id, id);
write_revocation::<S>(&self.db_ptr, &mut txn, &revocation, timestamp, tx_ctx)?;
Controller::new(&self.db)
.messages()
.unsubscribe_all(&mut txn, id)?;
}
Commit::Other => { }
}
txn.commit()?;
let elapsed = now.elapsed();
debug!("storage commit: {elapsed:?}");
debug_assert!(self.active.is_none());
debug_assert!(self.log_buffer.is_empty());
Ok(log_output)
}
fn get_storage_key(&self, base_key: u64, sub_key: u64) -> wasmtime::Result<StorageKey> {
let key = self.active.storage_key(base_key, sub_key)?;
Ok(key)
}
pub fn set_register(&mut self, register_id: u64, value: Vec<u8>) {
self.registers.insert(register_id, value.into());
}
pub fn get_register(&self, register_id: u64) -> Option<Vec<u8>> {
self.registers.get(®ister_id).map(|v| v.borrow().clone())
}
fn clear_cursor_registers(&mut self) -> Result<()> {
self.registers.retain(|&k, _| k < REGISTER_CURSOR);
Ok(())
}
fn clear_register(&mut self, register_id: u64) {
self.registers.remove(®ister_id);
}
pub fn read_action(&self, cid: &ContractId, idx: usize) -> Result<Option<ActionRecord>> {
ActionLog::new(&self.db, *cid).get(idx)
}
pub fn len_actions(&self, cid: &ContractId) -> Result<u64> {
ActionLog::new(&self.db, *cid).len()
}
#[cfg(feature = "agents")]
pub fn register_ws(&mut self, ch: mpsc::Sender<Vec<u8>>) -> Result<()> {
let state = self._async.as_mut().ok_or_else(|| ErrorKind::NoAsync)?;
state.ws_sender = Some(ch);
Ok(())
}
}
#[derive(Default)]
struct AsyncState {
#[cfg(feature = "agents")]
ws_sender: Option<mpsc::Sender<Vec<u8>>>,
}
fn get_memory(caller: &mut Caller<'_, VmState<impl Db>>) -> wasmtime::Result<Memory> {
match caller.get_export("memory") {
Some(Extern::Memory(mem)) => Ok(mem),
_ => Err(wasmtime::Error::msg("Failed to find memory")),
}
}
fn create_buffer(len: u64) -> Vec<u8> {
let mut buffer = Vec::with_capacity(len as usize);
#[allow(clippy::uninit_vec)]
unsafe {
buffer.set_len(len as usize);
}
buffer
}
fn copy_wasm_memory(
caller: &mut Caller<'_, VmState<impl Db>>,
memory: &Memory,
wasm_ptr: u64,
wasm_ptr_len: u64,
) -> wasmtime::Result<Vec<u8>> {
let mut buffer = create_buffer(wasm_ptr_len);
memory
.read(caller, wasm_ptr as usize, &mut buffer)
.map_err(|e| wasmtime::Error::msg(format!("Failed to read from memory: {e}")))?;
Ok(buffer)
}
pub fn tic(mut caller: Caller<'_, VmState<impl Db>>) {
caller.data_mut().last_timer = Some(Instant::now());
}
pub fn toc(caller: Caller<'_, VmState<impl Db>>) -> wasmtime::Result<u64> {
let timer = caller
.data()
.last_timer
.ok_or_else(|| wasmtime::Error::msg("no timer present"))?;
let elapsed = timer.elapsed();
Ok(elapsed
.as_nanos()
.try_into()
.expect("your program should not run for 584.942 years"))
}
pub fn print(
mut caller: Caller<'_, VmState<impl Db>>,
ptr: u64,
len: u64,
level: u32,
) -> wasmtime::Result<()> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
let memory = get_memory(&mut caller)?;
let data = memory
.data(&mut caller)
.get(ptr as usize..(ptr + len) as usize)
.ok_or_else(|| wasmtime::Error::msg("Memory access out of bounds"))?;
let msg =
String::from_utf8(data.to_vec()).unwrap_or_else(|e| format!("Invalid UTF-8 sequence: {e}"));
let line = LogLine::new(timestamp, level, msg);
caller.data_mut().log_buffer.push(line);
Ok(())
}
pub fn read_register(
mut caller: Caller<'_, VmState<impl Db>>,
register_id: u64,
ptr: u64,
) -> wasmtime::Result<()> {
let data = caller
.data()
.registers
.get(®ister_id)
.cloned()
.ok_or_else(|| wasmtime::Error::msg(format!("Register {register_id} not found")))?;
let memory = get_memory(&mut caller)?;
let mem_size = memory.data_size(&caller);
if (ptr as usize) + data.borrow().len() > mem_size {
return Err(wasmtime::Error::msg("Memory access out of bounds"));
}
memory
.write(&mut caller, ptr as usize, &data.borrow())
.map_err(|e| wasmtime::Error::msg(format!("Failed to write to memory: {e}")))?;
Ok(())
}
pub fn register_len(caller: Caller<'_, VmState<impl Db>>, register_id: u64) -> u64 {
match caller.data().registers.get(®ister_id) {
Some(data) => data.borrow().len() as u64,
None => u64::MAX,
}
}
pub fn write_register(
mut caller: Caller<'_, VmState<impl Db>>,
register_id: u64,
wasm_ptr: u64,
wasm_ptr_len: u64,
) -> wasmtime::Result<()> {
let memory = get_memory(&mut caller)?;
let value = copy_wasm_memory(&mut caller, &memory, wasm_ptr, wasm_ptr_len)?;
caller.data_mut().set_register(register_id, value);
Ok(())
}
pub fn storage_write(
mut caller: Caller<'_, VmState<impl Db>>,
base_key: u64,
sub_key: u64,
value_ptr: u64,
value_len: u64,
) -> wasmtime::Result<()> {
if caller.data().active.is_immutable() {
return Ok(());
}
let memory = get_memory(&mut caller)?;
let value = copy_wasm_memory(&mut caller, &memory, value_ptr, value_len)?;
let key = caller.data().get_storage_key(base_key, sub_key)?;
caller
.data_mut()
.active
.push_storage(StorageOp::write(key, value))?;
Ok(())
}
pub fn storage_read(
mut caller: Caller<'_, VmState<impl Db>>,
base_key: u64,
sub_key: u64,
register_id: u64,
) -> wasmtime::Result<()> {
let key = caller.data().get_storage_key(base_key, sub_key)?;
let caller_data = &mut caller.data_mut();
let txn = caller_data.db.begin_ro_txn()?;
let value = txn.read(&caller_data.db_ptr, &key)?.map(|v| v.to_vec());
txn.commit()?;
if let Some(value) = value {
caller.data_mut().set_register(register_id, value);
} else {
caller_data.clear_register(register_id);
}
Ok(())
}
pub fn storage_remove(
mut caller: Caller<'_, VmState<impl Db>>,
base_key: u64,
sub_key: u64,
) -> wasmtime::Result<()> {
if caller.data().active.is_immutable() {
return Ok(());
}
let key = caller.data().get_storage_key(base_key, sub_key)?;
let caller_data = &mut caller.data_mut();
caller_data.active.push_storage(StorageOp::remove(key))?;
Ok(())
}
pub fn storage_cursor(
mut caller: Caller<'_, VmState<impl Db>>,
base_key: u64,
) -> wasmtime::Result<u64> {
let key = caller.data().get_storage_key(base_key, 1)?;
let tgt_prefix = key.get_prefix();
let db = &caller.data().db;
let db_ptr = &caller.data().db_ptr;
let txn = db.begin_ro_txn()?;
let mut cursor = txn.ro_cursor(db_ptr)?;
let keys: Vec<u64> = cursor
.iter_from(&key)
.map(|(key, _)| StorageKey::try_from(key).expect("Slice length error"))
.take_while(|key| {
let key_prefix = key.get_prefix();
key_prefix.starts_with(&tgt_prefix)
})
.map(|key| key.sub_key())
.collect();
drop(cursor);
drop(txn);
let caller_data = &mut caller.data_mut();
for (i, key) in keys.iter().enumerate() {
let bytes = key.to_le_bytes().to_vec();
caller_data.set_register(REGISTER_CURSOR.saturating_add(i as u64), bytes);
}
Ok(keys.len() as u64)
}
pub fn storage_has_key(
mut caller: Caller<'_, VmState<impl Db>>,
base_key: u64,
sub_key: u64,
) -> wasmtime::Result<u64> {
let key = caller.data().get_storage_key(base_key, sub_key)?;
let caller_data = &mut caller.data_mut();
let txn = caller_data.db.begin_ro_txn()?;
let result = txn.read(&caller_data.db_ptr, &key)?.is_some();
txn.commit()?;
Ok(result as u64)
}
pub fn storage_gen_sub_key() -> wasmtime::Result<u64> {
let mut rng = rand::rng();
Ok(rng.random())
}
pub fn create_ledger_entry(
mut caller: Caller<'_, VmState<impl Db>>,
wasm_ptr: u64,
wasm_len: u64,
) -> wasmtime::Result<u64> {
if caller.data().active.is_immutable() {
return Ok(0);
}
let memory = get_memory(&mut caller)?;
let value = copy_wasm_memory(&mut caller, &memory, wasm_ptr, wasm_len)?;
let entry = LedgerEntry::from_bytes(&value)?;
let cid = caller
.data()
.active
.is_contract()
.ok_or_else(|| Error::msg("ledger-entry can only be created in contracts"))?;
let participants = Controller::new(&caller.data().db)
.contract_participants(&cid)?
.unwrap_or_default();
let creditor = participants.iter().any(|p| p.id == entry.creditor);
let debitor = participants.iter().any(|p| p.id == entry.debitor);
if creditor && debitor {
caller.data_mut().active.push_ledger(entry)?;
Ok(0)
} else if !creditor && debitor {
Ok(1)
} else if creditor && !debitor {
Ok(2)
} else {
Ok(3)
}
}
pub fn rand(min: u64, max: u64) -> wasmtime::Result<u64> {
let mut rng = rand::rng();
let value: u64 = rng.random_range(min..max);
Ok(value)
}
pub fn timestamp(caller: Caller<'_, VmState<impl Db>>) -> wasmtime::Result<i64> {
match caller.data().active {
ActiveEntity::Contract { .. } => {
let data = caller
.data()
.get_register(REGISTER_BLOCK_CTX)
.context("missing block-ctx")?;
let ctx = BlockCtx::from_bytes(&data).context("invalid block-ctx in register")?;
Ok(ctx.timestamp as i64)
}
ActiveEntity::Agent { .. } => {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis()
.try_into()
.expect("i64 should fit for 292471208 years");
Ok(timestamp)
}
ActiveEntity::None => Err(wasmtime::Error::msg("no active entity set")),
}
}
pub fn subscribe(
mut caller: Caller<'_, VmState<impl Db>>,
wasm_ptr: u64,
wasm_len: u64,
) -> wasmtime::Result<u64> {
let aid = match caller.data().active.is_agent() {
Some(aid) => aid,
None => return Ok(1),
};
if caller.data().active.is_immutable() {
return Ok(0);
}
let memory = get_memory(&mut caller)?;
let bytes = copy_wasm_memory(&mut caller, &memory, wasm_ptr, wasm_len)?;
let topic = Topic::from_bytes(&bytes)?;
let db = &caller.data().db;
let sub_handler = Controller::new(db).messages();
sub_handler.subscribe(aid, topic)?;
Ok(0)
}
pub fn unsubscribe(
mut caller: Caller<'_, VmState<impl Db>>,
wasm_ptr: u64,
wasm_len: u64,
) -> wasmtime::Result<u64> {
let aid = match caller.data().active.is_agent() {
Some(aid) => aid,
None => return Ok(1),
};
if caller.data().active.is_immutable() {
return Ok(0);
}
let memory = get_memory(&mut caller)?;
let bytes = copy_wasm_memory(&mut caller, &memory, wasm_ptr, wasm_len)?;
let topic = Topic::from_bytes(&bytes)?;
let db = &caller.data().db;
let sub_handler = Controller::new(db).messages();
sub_handler.unsubscribe(aid, topic)?;
Ok(0)
}
#[cfg(feature = "agents")]
pub mod async_abi {
use borderless::Context;
use super::*;
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Client, Method as ReqwestMethod, Request, Response,
};
use std::{result::Result, str::FromStr, time::Duration};
pub async fn send_ws_msg(
mut caller: Caller<'_, VmState<impl Db>>,
msg_ptr: u64,
msg_len: u64,
) -> wasmtime::Result<u64> {
let agent_id = caller
.data()
.active
.is_agent()
.ok_or_else(|| wasmtime::Error::msg("only sw-agents can send ws-msgs"))?;
let memory = get_memory(&mut caller)?;
let data = memory
.data(&mut caller)
.get(msg_ptr as usize..(msg_ptr + msg_len) as usize)
.ok_or_else(|| wasmtime::Error::msg("Memory access out of bounds"))?
.to_vec();
let state = caller
.data()
._async
.as_ref()
.ok_or_else(|| wasmtime::Error::msg("missing async-state in async runtime"))?;
match &state.ws_sender {
Some(ch) => {
match tokio::time::timeout(Duration::from_secs(10), ch.send(data)).await {
Ok(Ok(())) => Ok(0),
Ok(Err(_)) => {
warn!("failed to send websocket message for agent {agent_id} - receiver closed");
Ok(1)
}
Err(_) => {
warn!("failed to send websocket message for agent {agent_id} - timeout");
Ok(2)
}
}
}
None => {
warn!("failed to send websocket message for agent {agent_id} - websocket is not registered");
Ok(3)
}
}
}
pub async fn send_http_rq(
mut caller: Caller<'_, VmState<impl Db>>,
register_rq_head: u64,
register_rq_body: u64,
register_rs_head: u64,
register_rs_body: u64,
register_failure: u64,
) -> wasmtime::Result<u64> {
let head = caller
.data_mut()
.registers
.remove(®ister_rq_head)
.context("missing rq-head")?
.into_inner();
let head = String::from_utf8(head)?;
let body = caller
.data_mut()
.registers
.remove(®ister_rq_body)
.context("missing rq-body")?
.into_inner();
let client = Client::new();
let rq = match parse_reqwest_request_from_parts(&client, &head, body) {
Ok(rq) => rq,
Err(e) => {
caller
.data_mut()
.set_register(register_failure, e.into_bytes());
return Ok(1);
}
};
let rs = match client.execute(rq).await {
Ok(rs) => rs,
Err(e) => {
caller
.data_mut()
.set_register(register_failure, e.to_string().into_bytes());
return Ok(1);
}
};
let (rs_head, rs_body) = match serialize_response_for_ffi(rs).await {
Ok(rs) => rs,
Err(e) => {
caller
.data_mut()
.set_register(register_failure, e.to_string().into_bytes());
return Ok(1);
}
};
caller
.data_mut()
.set_register(register_rs_head, rs_head.into_bytes());
caller.data_mut().set_register(register_rs_body, rs_body);
Ok(0)
}
fn parse_reqwest_request_from_parts(
client: &Client,
head: &str,
body: Vec<u8>,
) -> Result<Request, String> {
let mut lines = head.lines();
let request_line = lines
.next()
.ok_or_else(|| "Empty request head".to_string())?;
let mut parts = request_line.split_whitespace();
let method_str = parts
.next()
.ok_or_else(|| "No HTTP method found".to_string())?;
let uri = parts.next().ok_or_else(|| "No URI found".to_string())?;
let _version = parts
.next()
.ok_or_else(|| "No HTTP version found".to_string())?;
let method = ReqwestMethod::from_str(method_str).map_err(|e| e.to_string())?;
let mut headers = HeaderMap::new();
for line in lines {
if line.trim().is_empty() {
continue; }
if let Some((name, value)) = line.split_once(":") {
let header_name = HeaderName::from_str(name.trim()).map_err(|e| e.to_string())?;
let header_value =
HeaderValue::from_str(value.trim()).map_err(|e| e.to_string())?;
headers.append(header_name, header_value);
} else {
return Err(format!("Malformed header line: {}", line));
}
}
let rq = {
let client = client.request(method, uri).headers(headers);
if !body.is_empty() {
client.body(body)
} else {
client
}
.build()
.map_err(|e| e.to_string())?
};
Ok(rq)
}
fn serialize_response_head(resp: &Response) -> Result<String, String> {
let status = resp.status();
let version = match resp.version() {
reqwest::Version::HTTP_10 => "HTTP/1.0",
reqwest::Version::HTTP_11 => "HTTP/1.1",
reqwest::Version::HTTP_2 => "HTTP/2",
other => return Err(format!("Unsupported HTTP version: {:?}", other)),
};
let mut head = format!(
"{} {} {}\r\n",
version,
status.as_u16(),
status.canonical_reason().unwrap_or("")
);
for (name, value) in resp.headers().iter() {
head.push_str(&format!(
"{}: {}\r\n",
name.as_str(),
value
.to_str()
.map_err(|e| format!("failed to read header value: {e}"))?
));
}
head.push_str("\r\n"); Ok(head)
}
async fn serialize_response_for_ffi(resp: Response) -> Result<(String, Vec<u8>), String> {
let head = serialize_response_head(&resp)?;
let body = resp
.bytes()
.await
.map_err(|e| format!("failed to read response body: {e}"))?
.to_vec();
Ok((head, body))
}
#[cfg(test)]
mod async_abi_tests {
use super::*;
use reqwest::Method;
#[test]
fn test_valid_post_request() {
let client = Client::new();
let head = "POST https://example.com/api HTTP/1.1\r\nContent-Type: application/json\r\nX-Test: 42\r\n\r\n";
let body = b"{\"hello\":\"world\"}".to_vec();
let request = parse_reqwest_request_from_parts(&client, head, body.clone()).unwrap();
assert_eq!(request.method(), Method::POST);
assert_eq!(request.url().as_str(), "https://example.com/api");
assert_eq!(request.headers()["Content-Type"], "application/json");
assert_eq!(request.headers()["X-Test"], "42");
assert_eq!(request.body().unwrap().as_bytes().unwrap(), body.as_slice());
}
#[test]
fn test_valid_get_request() {
let client = Client::new();
let head = "GET https://example.com/ HTTP/1.1\r\nAccept: */*\r\n\r\n";
let body = Vec::new();
let request = parse_reqwest_request_from_parts(&client, head, body.clone()).unwrap();
assert_eq!(request.method(), Method::GET);
assert_eq!(request.url().as_str(), "https://example.com/");
assert_eq!(request.headers()["Accept"], "*/*");
assert!(request.body().is_none()); }
#[test]
fn test_missing_method_error() {
let client = Client::new();
let head = " https://example.com/api HTTP/1.1\r\nContent-Type: text/plain\r\n\r\n";
let body = b"Missing method".to_vec();
let result = parse_reqwest_request_from_parts(&client, head, body);
assert!(result.is_err());
}
#[test]
fn test_missing_uri_error() {
let client = Client::new();
let head = "POST HTTP/1.1\r\nContent-Type: text/plain\r\n\r\n";
let body = b"Missing URI".to_vec();
let result = parse_reqwest_request_from_parts(&client, head, body);
assert!(result.is_err());
}
#[test]
fn test_missing_version_error() {
let client = Client::new();
let head = "POST https://example.com/api\r\nContent-Type: text/plain\r\n\r\n";
let body = b"Missing version".to_vec();
let result = parse_reqwest_request_from_parts(&client, head, body);
assert!(result.is_err());
}
#[test]
fn test_malformed_header_error() {
let client = Client::new();
let head = "POST https://example.com/api HTTP/1.1\r\nBadHeaderWithoutColon\r\n\r\n";
let body = b"Malformed header".to_vec();
let result = parse_reqwest_request_from_parts(&client, head, body);
assert!(result.is_err());
}
#[test]
fn test_empty_head_error() {
let client = Client::new();
let head = "";
let body = b"Empty head".to_vec();
let result = parse_reqwest_request_from_parts(&client, head, body);
assert!(result.is_err());
}
}
}
pub enum Commit {
Action(CallAction),
Introduction(Introduction),
Revocation(Revocation),
Other,
}
pub enum ActiveEntity {
Contract {
cid: ContractId,
db_txns: Option<Vec<StorageOp>>,
ledger_entries: Option<Vec<LedgerEntry>>,
tx_ctx: Option<TxCtx>,
},
Agent {
aid: AgentId,
db_txns: Option<Vec<StorageOp>>,
},
None,
}
impl ActiveEntity {
pub fn contract_tx(cid: ContractId, mutable: bool, tx_ctx: TxCtx) -> Self {
let db_txns = if mutable { Some(Vec::new()) } else { None };
let ledger_entries = if mutable { Some(Vec::new()) } else { None };
ActiveEntity::Contract {
cid,
db_txns,
ledger_entries,
tx_ctx: Some(tx_ctx),
}
}
pub fn contract_http(cid: ContractId) -> Self {
ActiveEntity::Contract {
cid,
db_txns: None,
ledger_entries: None,
tx_ctx: None,
}
}
pub fn agent(aid: AgentId, mutable: bool) -> Self {
let db_txns = if mutable { Some(Vec::new()) } else { None };
ActiveEntity::Agent { aid, db_txns }
}
pub fn none() -> Self {
ActiveEntity::None
}
fn is_some(&self) -> bool {
!self.is_none()
}
fn is_none(&self) -> bool {
matches!(self, ActiveEntity::None)
}
fn is_agent(&self) -> Option<AgentId> {
match self {
ActiveEntity::Agent { aid, .. } => Some(*aid),
_ => None,
}
}
fn is_contract(&self) -> Option<ContractId> {
match self {
ActiveEntity::Contract { cid, .. } => Some(*cid),
_ => None,
}
}
fn storage_key(&self, base_key: u64, sub_key: u64) -> Result<StorageKey> {
match self {
ActiveEntity::Contract { cid, .. } => Ok(StorageKey::new(cid, base_key, sub_key)),
ActiveEntity::Agent { aid, .. } => Ok(StorageKey::new(aid, base_key, sub_key)),
ActiveEntity::None => Err(ErrorKind::NoActiveEntity.into()),
}
}
fn is_immutable(&self) -> bool {
match self {
ActiveEntity::Contract { db_txns, .. } | ActiveEntity::Agent { db_txns, .. } => {
db_txns.is_none()
}
ActiveEntity::None => false,
}
}
fn push_storage(&mut self, op: StorageOp) -> Result<()> {
match self {
ActiveEntity::Contract { db_txns, .. } | ActiveEntity::Agent { db_txns, .. } => {
if let Some(db_txns) = db_txns {
db_txns.push(op);
Ok(())
} else {
Err(ErrorKind::Immutable.into())
}
}
ActiveEntity::None => Err(ErrorKind::NoActiveEntity.into()),
}
}
fn push_ledger(&mut self, entry: LedgerEntry) -> Result<()> {
match self {
ActiveEntity::Contract { ledger_entries, .. } => {
if let Some(ledger) = ledger_entries {
ledger.push(entry);
Ok(())
} else {
Err(ErrorKind::Immutable.into())
}
}
ActiveEntity::Agent { .. } | ActiveEntity::None => {
Err(ErrorKind::NoActiveEntity.into())
}
}
}
}
pub enum StorageOp {
Write { key: StorageKey, value: Vec<u8> },
Remove { key: StorageKey },
}
impl StorageOp {
pub fn write(key: StorageKey, value: Vec<u8>) -> Self {
Self::Write { key, value }
}
pub fn remove(key: StorageKey) -> Self {
Self::Remove { key }
}
pub fn is_userspace(&self) -> bool {
match self {
StorageOp::Write { key, .. } | StorageOp::Remove { key } => key.is_user_key(),
}
}
}
#[cfg(test)]
mod tests {
use backend::lmdb::Lmdb;
use tempfile::{tempdir, TempDir};
use super::*;
fn dummy_vm_state() -> (VmState<Lmdb>, TempDir) {
let tmp_dir = tempdir().expect("failed to create tmp directory for testing");
let db = Lmdb::new(tmp_dir.path(), 2).expect("failed to create lmdb");
let db_ptr = db
.create_sub_db("dummy-sub-db")
.expect("failed to create sub-db");
let state = VmState::new(db, db_ptr);
(state, tmp_dir)
}
#[test]
fn finish_none() {
let (mut state, _tmp_dir) = dummy_vm_state();
let res = state.finish_exec(None);
assert!(res.is_ok());
assert!(res.unwrap().is_empty(), "no logs should have been written");
}
#[test]
fn finish_resets_everything() {
let (mut state, _tmp_dir) = dummy_vm_state();
let registers = [
REGISTER_OUTPUT,
REGISTER_CURSOR + 1,
REGISTER_OUTPUT_HTTP_RESULT,
REGISTER_OUTPUT_HTTP_STATUS,
];
for r in registers {
state.set_register(r, vec![1, 2, 3, 4]);
}
let _res = state.finish_exec(None);
for r in registers {
assert!(state.get_register(r).is_none(), "registers must be cleared");
}
assert!(
matches!(state.active, ActiveEntity::None),
"active item must be reset"
);
}
#[test]
fn no_storage_key_on_inactive() {
let (state, _tmp_dir) = dummy_vm_state();
let res = state.get_storage_key(0, 0);
assert!(res.is_err());
}
#[test]
fn storage_key_prefix_matches_active() {
let (mut state, _tmp_dir) = dummy_vm_state();
let cid = ContractId::generate();
state.active = ActiveEntity::contract_http(cid);
let res = state.get_storage_key(0, 0);
assert!(res.is_ok());
let key_cid = res.unwrap().contract_id();
assert!(key_cid.is_some());
assert_eq!(key_cid.unwrap(), cid);
let aid = AgentId::generate();
state.active = ActiveEntity::agent(aid, false);
let res = state.get_storage_key(0, 0);
assert!(res.is_ok());
let key_aid = res.unwrap().agent_id();
assert!(key_aid.is_some());
assert_eq!(key_aid.unwrap(), aid);
}
}