#[cfg(feature = "contracts")]
pub mod contract;
#[cfg(feature = "agents")]
pub mod agent;
#[cfg(any(feature = "contracts", feature = "agents"))]
mod vm;
#[cfg(any(feature = "contracts", feature = "agents"))]
pub use code_store::CodeStore;
#[cfg(any(feature = "contracts", feature = "agents"))]
pub mod factory {
use std::num::NonZeroUsize;
use super::CodeStore;
use crate::{AgentLock, AgentRuntime, ContractLock, ContractRuntime, Result};
use borderless_kv_store::Db;
pub struct RtFactory<'a, S: Db> {
code_store: Option<CodeStore<S>>,
#[cfg(feature = "contracts")]
lck_contract: Option<ContractLock>,
#[cfg(feature = "agents")]
lck_agent: Option<AgentLock>,
db: &'a S,
}
impl<'a, S: Db> RtFactory<'a, S> {
pub fn new(db: &'a S) -> Self {
Self {
code_store: None,
#[cfg(feature = "agents")]
lck_agent: None,
#[cfg(feature = "contracts")]
lck_contract: None,
db,
}
}
pub fn with_store(db: &'a S, code_store: CodeStore<S>) -> Self {
Self {
code_store: Some(code_store),
#[cfg(feature = "agents")]
lck_agent: None,
#[cfg(feature = "contracts")]
lck_contract: None,
db,
}
}
pub fn set_cache_size(&mut self, cache_size: NonZeroUsize) -> Result<()> {
match &mut self.code_store {
Some(_cache) => {
panic!("cannot initialize cache twice")
}
None => {
self.code_store = Some(CodeStore::with_cache_size(self.db, cache_size)?);
}
}
Ok(())
}
#[cfg(feature = "contracts")]
pub fn spawn_contract_rt(&mut self) -> Result<ContractRuntime<S>> {
if self.code_store.is_none() {
self.code_store = Some(CodeStore::new(self.db)?);
}
let code_store = self.code_store.as_ref().unwrap();
if self.lck_contract.is_none() {
self.lck_contract = Some(ContractLock::default());
}
let lock = self.lck_contract.as_ref().unwrap();
ContractRuntime::new(self.db, code_store.clone(), lock.clone())
}
#[cfg(feature = "agents")]
pub fn spawn_agent_rt(&mut self) -> Result<AgentRuntime<S>> {
if self.code_store.is_none() {
self.code_store = Some(CodeStore::new(self.db)?);
}
let code_store = self.code_store.as_ref().unwrap();
if self.lck_agent.is_none() {
self.lck_agent = Some(AgentLock::default());
}
let lock = self.lck_agent.as_ref().unwrap();
AgentRuntime::new(self.db, code_store.clone(), lock.clone())
}
}
}
#[cfg(feature = "code-store")]
pub mod code_store {
use super::vm::VmState;
use borderless::{aid_prefix, cid_prefix, AgentId, ContractId};
use borderless_kv_store::{Db, RawRead, RawWrite, Tx};
use lru::LruCache;
use parking_lot::Mutex;
use std::time::Instant;
use std::{num::NonZeroUsize, sync::Arc};
use wasmtime::{Engine, Instance, Linker, Module, Store};
use crate::{log_shim::*, AGENT_SUB_DB, CONTRACT_SUB_DB};
use crate::{Result, WASM_CODE_SUB_DB};
type Id = [u8; 16];
#[derive(Clone)]
pub struct CodeStore<S: Db> {
db: S,
cache: Arc<Mutex<LruCache<Id, Module, ahash::RandomState>>>,
}
impl<S: Db> CodeStore<S> {
pub fn new(db: &S) -> Result<Self> {
Self::with_cache_size(db, NonZeroUsize::new(16).unwrap())
}
pub fn with_cache_size(db: &S, cache_size: NonZeroUsize) -> Result<Self> {
let _db_ptr = db.create_sub_db(WASM_CODE_SUB_DB)?;
let cache = LruCache::with_hasher(cache_size, ahash::RandomState::default());
Ok(Self {
db: db.clone(),
cache: Arc::new(Mutex::new(cache)),
})
}
pub fn create_store(&self, engine: &Engine) -> Result<Store<VmState<S>>> {
let db_ptr = if engine.is_async() {
self.db.open_sub_db(AGENT_SUB_DB)?
} else {
self.db.open_sub_db(CONTRACT_SUB_DB)?
};
let state = VmState::new(self.db.clone(), db_ptr);
let store = Store::new(engine, state);
Ok(store)
}
pub fn insert_contract(&self, cid: ContractId, module: Module) -> Result<()> {
let module_bytes = module.serialize()?;
let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
let mut txn = self.db.begin_rw_txn()?;
txn.write(&db_ptr, &cid, &module_bytes)?;
txn.commit()?;
Ok(())
}
pub fn insert_swagent(&self, aid: AgentId, module: Module) -> Result<()> {
let module_bytes = module.serialize()?;
let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
let mut txn = self.db.begin_rw_txn()?;
txn.write(&db_ptr, &aid, &module_bytes)?;
txn.commit()?;
Ok(())
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid)))]
pub fn get_contract(
&mut self,
cid: &ContractId,
engine: &Engine,
linker: &mut Linker<VmState<S>>,
) -> Result<Option<(Instance, Store<VmState<S>>)>> {
let start = Instant::now();
let module = match self.read_module(cid.as_bytes(), engine)? {
Some(m) => m,
None => return Ok(None),
};
let elapsed = start.elapsed();
debug!("Read module in {elapsed:?}");
let start = Instant::now();
let mut store = self.create_store(engine)?;
let instance = linker.instantiate(&mut store, &module)?;
let elapsed = start.elapsed();
debug!("Instantiated module in {elapsed:?}");
Ok(Some((instance, store)))
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid)))]
pub async fn get_agent(
&mut self,
aid: &AgentId,
engine: &Engine,
linker: &mut Linker<VmState<S>>,
) -> Result<Option<(Instance, Store<VmState<S>>)>> {
let start = Instant::now();
let module = match self.read_module(aid.as_bytes(), engine)? {
Some(m) => m,
None => return Ok(None),
};
let elapsed = start.elapsed();
debug!("Read module in {elapsed:?}");
let start = Instant::now();
let mut store = self.create_store(engine)?;
let instance = linker.instantiate_async(&mut store, &module).await?;
let elapsed = start.elapsed();
debug!("Instantiated module in {elapsed:?}");
Ok(Some((instance, store)))
}
fn read_module(&mut self, key: &[u8; 16], engine: &Engine) -> Result<Option<Module>> {
if let Some(module) = self.cache.lock().get(key.as_ref()) {
return Ok(Some(module.clone()));
}
let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
let txn = self.db.begin_ro_txn()?;
let module_bytes = txn.read(&db_ptr, &key)?;
let module = match module_bytes {
Some(bytes) => unsafe { Module::deserialize(engine, bytes)? },
None => return Ok(None),
};
txn.commit()?;
self.cache.lock().push(*key, module.clone());
Ok(Some(module))
}
pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
use borderless_kv_store::*;
let mut out = Vec::new();
let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
let txn = self.db.begin_ro_txn()?;
let mut cursor = txn.ro_cursor(&db_ptr)?;
for (key, _value) in cursor.iter().filter(|(key, _)| cid_prefix(key)) {
let cid =
ContractId::from_bytes(key.try_into().map_err(|_| {
crate::Error::msg("failed to parse contract-id from storage")
})?);
out.push(cid);
}
drop(cursor);
txn.commit()?;
Ok(out)
}
pub fn available_swagents(&self) -> Result<Vec<AgentId>> {
use borderless_kv_store::*;
let mut out = Vec::new();
let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
let txn = self.db.begin_ro_txn()?;
let mut cursor = txn.ro_cursor(&db_ptr)?;
for (key, _value) in cursor.iter().filter(|(key, _)| aid_prefix(key)) {
let aid = AgentId::from_bytes(
key.try_into()
.map_err(|_| crate::Error::msg("failed to parse agent-id from storage"))?,
);
out.push(aid);
}
drop(cursor);
txn.commit()?;
Ok(out)
}
pub fn get_db(&self) -> S {
self.db.clone()
}
}
}