use super::{
action_log::{ActionLog, ActionRecord, RelTxAction},
ledger::Ledger,
logger::Logger,
subscriptions::SubscriptionHandler,
};
use crate::log_shim::warn;
use crate::{Result, ACTION_TX_REL_SUB_DB, AGENT_SUB_DB, CONTRACT_SUB_DB};
use borderless::common::Participant;
use borderless::events::Events;
use borderless::{
common::{Description, Metadata, Revocation},
contracts::Info,
ContractId,
__private::storage_keys::*,
events::Sink,
hash::Hash256,
http::{AgentInfo, ContractInfo},
pkg::{Source, SourceFlattened, WasmPkg, WasmPkgNoSource},
prelude::{Id, TxCtx},
AgentId, TxIdentifier,
};
use borderless_kv_store::*;
use serde::de::DeserializeOwned;
pub struct Controller<'a, S: Db> {
db: &'a S,
}
impl<'a, S: Db> Controller<'a, S> {
pub fn new(db: &'a S) -> Self {
Self { db }
}
pub fn actions(&self, cid: ContractId) -> ActionLog<'a, S> {
ActionLog::new(self.db, cid)
}
pub fn logs(&self, id: impl Into<Id>) -> Logger<'a, S> {
Logger::new(self.db, id)
}
pub fn ledger(&self) -> Ledger<'a, S> {
Ledger::new(self.db)
}
pub fn messages(&self) -> SubscriptionHandler<'a, S> {
SubscriptionHandler::new(self.db)
}
pub fn contract_participants(&self, cid: &ContractId) -> Result<Option<Vec<Participant>>> {
self.read_value(
&Id::contract(*cid),
BASE_KEY_METADATA,
META_SUB_KEY_PARTICIPANTS,
)
}
pub fn contract_exists(&self, cid: &ContractId) -> Result<bool> {
Ok(self
.read_value::<ContractId>(&Id::contract(*cid), BASE_KEY_METADATA, META_SUB_KEY_ID)?
.is_some())
}
pub fn agent_exists(&self, aid: &AgentId) -> Result<bool> {
Ok(self
.read_value::<AgentId>(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_ID)?
.is_some())
}
pub fn contract_revoked(&self, cid: &ContractId) -> Result<bool> {
Ok(self.contract_revoked_ts(cid)?.is_some())
}
pub fn agent_revoked(&self, aid: &AgentId) -> Result<bool> {
Ok(self.agent_revoked_ts(aid)?.is_some())
}
pub fn contract_revoked_ts(&self, cid: &ContractId) -> Result<Option<u64>> {
self.read_value::<u64>(
&Id::contract(*cid),
BASE_KEY_METADATA,
META_SUB_KEY_REVOKED_TS,
)
}
pub fn agent_revoked_ts(&self, aid: &AgentId) -> Result<Option<u64>> {
self.read_value::<u64>(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_REVOKED_TS)
}
pub fn contract_last_tx_hash(&self, cid: &ContractId) -> Result<Option<Hash256>> {
let actions = ActionLog::new(self.db, *cid);
if let Some(action) = actions.last()? {
return Ok(Some(action.tx_ctx.tx_id.hash));
}
match self.contract_meta(cid)? {
Some(meta) => Ok(meta.tx_ctx_introduction.map(|t| t.tx_id.hash)),
None => Ok(None),
}
}
pub fn contract_info(&self, cid: &ContractId) -> Result<Option<Info>> {
let id = Id::contract(*cid);
let participants = self.read_value(&id, BASE_KEY_METADATA, META_SUB_KEY_PARTICIPANTS)?;
let sinks = self.read_value(&id, BASE_KEY_METADATA, META_SUB_KEY_SINKS)?;
match (participants, sinks) {
(Some(participants), Some(sinks)) => Ok(Some(Info {
contract_id: *cid,
participants,
sinks,
})),
_ => Ok(None),
}
}
pub fn agent_sinks(&self, aid: &AgentId) -> Result<Option<Vec<Sink>>> {
let aid = Id::agent(*aid);
self.read_value(&aid, BASE_KEY_METADATA, META_SUB_KEY_SINKS)
}
pub fn agent_subs(&self, aid: &AgentId) -> Result<Vec<String>> {
self.messages().get_subscriptions(*aid)
}
pub fn contract_desc(&self, cid: &ContractId) -> Result<Option<Description>> {
self.read_value(&Id::contract(*cid), BASE_KEY_METADATA, META_SUB_KEY_DESC)
}
pub fn agent_desc(&self, aid: &AgentId) -> Result<Option<Description>> {
self.read_value(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_DESC)
}
pub fn contract_meta(&self, cid: &ContractId) -> Result<Option<Metadata>> {
self.read_value(&Id::contract(*cid), BASE_KEY_METADATA, META_SUB_KEY_META)
}
pub fn agent_meta(&self, aid: &AgentId) -> Result<Option<Metadata>> {
self.read_value(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_META)
}
pub fn contract_full(&self, cid: &ContractId) -> Result<Option<ContractInfo>> {
let info = self.contract_info(cid)?;
let desc = self.contract_desc(cid)?;
let meta = self.contract_meta(cid)?;
Ok(Some(ContractInfo { info, desc, meta }))
}
pub fn agent_full(&self, aid: &AgentId) -> Result<Option<AgentInfo>> {
let sinks = self.agent_sinks(aid)?.unwrap_or_default();
let subs = self.agent_subs(aid)?;
let desc = self.agent_desc(aid)?;
let meta = self.agent_meta(aid)?;
Ok(Some(AgentInfo {
agent_id: *aid,
sinks,
subs,
desc,
meta,
}))
}
pub fn contract_revocation(&self, cid: &ContractId) -> Result<Option<Revocation>> {
self.read_value(
&Id::contract(*cid),
BASE_KEY_METADATA,
META_SUB_KEY_REVOCATION,
)
}
pub fn agent_revocation(&self, aid: &AgentId) -> Result<Option<Revocation>> {
self.read_value(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_REVOCATION)
}
pub fn query_action(&self, tx_id: &TxIdentifier) -> Result<Option<ActionRecord>> {
let tx_id_bytes = tx_id.to_bytes();
let relation = {
let rel_db = self.db.create_sub_db(ACTION_TX_REL_SUB_DB)?;
let txn = self.db.begin_ro_txn()?;
match txn.read(&rel_db, &tx_id_bytes)? {
Some(bytes) => RelTxAction::from_bytes(bytes),
None => return Ok(None),
}
};
match self
.actions(relation.cid)
.get(relation.action_idx as usize)?
{
Some(record) => {
debug_assert!(record.tx_ctx.tx_id == *tx_id, "tx-id must match");
Ok(Some(record))
}
None => Ok(None),
}
}
pub fn agent_pkg_def(&self, aid: &AgentId) -> Result<Option<WasmPkgNoSource>> {
self.read_value(
&Id::agent(*aid),
BASE_KEY_METADATA,
META_SUB_KEY_PACKAGE_DEF,
)
}
pub fn agent_pkg_source(&self, aid: &AgentId) -> Result<Option<Source>> {
let source: Option<SourceFlattened> = self.read_value(
&Id::agent(*aid),
BASE_KEY_METADATA,
META_SUB_KEY_PACKAGE_SOURCE,
)?;
Ok(source.map(|s| s.unflatten()))
}
pub fn agent_pkg_full(&self, aid: &AgentId) -> Result<Option<WasmPkg>> {
let pkg_def = self.agent_pkg_def(aid)?;
let source = self.agent_pkg_source(aid)?;
match (pkg_def, source) {
(Some(pkg), Some(source)) => Ok(Some(WasmPkg::from_def_and_source(pkg, source))),
_ => Ok(None),
}
}
pub fn contract_pkg_def(&self, aid: &ContractId) -> Result<Option<WasmPkgNoSource>> {
self.read_value(
&Id::contract(*aid),
BASE_KEY_METADATA,
META_SUB_KEY_PACKAGE_DEF,
)
}
pub fn contract_pkg_source(&self, aid: &ContractId) -> Result<Option<Source>> {
let source: Option<SourceFlattened> = self.read_value(
&Id::contract(*aid),
BASE_KEY_METADATA,
META_SUB_KEY_PACKAGE_SOURCE,
)?;
Ok(source.map(|s| s.unflatten()))
}
pub fn contract_pkg_full(&self, aid: &ContractId) -> Result<Option<WasmPkg>> {
let pkg_def = self.contract_pkg_def(aid)?;
let source = self.contract_pkg_source(aid)?;
match (pkg_def, source) {
(Some(pkg), Some(source)) => Ok(Some(WasmPkg::from_def_and_source(pkg, source))),
_ => Ok(None),
}
}
fn read_value<D: DeserializeOwned>(
&self,
id: &Id,
base_key: u64,
sub_key: u64,
) -> Result<Option<D>> {
let db_ptr = match id {
Id::Contract { .. } => self.db.open_sub_db(CONTRACT_SUB_DB)?,
Id::Agent { .. } => self.db.open_sub_db(AGENT_SUB_DB)?,
};
let txn = self.db.begin_ro_txn()?;
let key = StorageKey::system_key(id, base_key, sub_key);
let bytes = txn.read(&db_ptr, &key)?;
let result = match bytes {
Some(val) => Some(postcard::from_bytes(val)?),
None => None,
};
txn.commit()?;
Ok(result)
}
pub fn filter_events(&self, events: Events) -> Result<Events> {
let mut filtered = Events::default();
for cc in events.contracts {
let cid = cc.contract_id;
if !self.contract_exists(&cid)? {
warn!("ContractCall contains a non-existing contract-id");
continue;
}
if self.contract_revoked(&cid)? {
warn!("ContractCall contains a revoked contract-id");
continue;
}
filtered.contracts.push(cc);
}
let sub_handler = self.messages();
for msg in events.local {
if sub_handler
.get_topic_subscribers(msg.publisher, msg.topic.to_string())?
.is_empty()
{
warn!("Message contains topic with no subscribers");
continue;
}
filtered.local.push(msg);
}
Ok(filtered)
}
}
#[cfg(any(feature = "contracts", feature = "agents"))]
pub(crate) fn write_system_value<S: Db, D: serde::Serialize, ID: AsRef<[u8; 16]>>(
db_ptr: &S::Handle,
txn: &mut <S as Db>::RwTx<'_>,
uid: ID,
base_key: u64,
sub_key: u64,
data: &D,
) -> Result<()> {
let key = StorageKey::system_key(uid, base_key, sub_key);
let bytes = postcard::to_allocvec(data)?;
txn.write(db_ptr, &key, &bytes)?;
Ok(())
}
#[cfg(any(feature = "contracts", feature = "agents"))]
pub(crate) fn read_system_value<S: Db, D: DeserializeOwned, ID: AsRef<[u8; 16]>>(
db_ptr: &S::Handle,
txn: &<S as Db>::RwTx<'_>,
cid: ID,
base_key: u64,
sub_key: u64,
) -> Result<Option<D>> {
let key = StorageKey::system_key(cid, base_key, sub_key);
let bytes = txn.read(db_ptr, &key)?;
match bytes {
Some(val) => {
let out = postcard::from_bytes(val)?;
Ok(Some(out))
}
None => Ok(None),
}
}
#[cfg(any(feature = "contracts", feature = "agents"))]
pub(crate) fn write_introduction<S: Db>(
db_ptr: &S::Handle,
txn: &mut <S as Db>::RwTx<'_>,
introduction: borderless::common::Introduction,
) -> Result<()> {
use borderless::__private::storage_keys::*;
use crate::error::ErrorKind;
let id = introduction.id;
let check_id =
read_system_value::<S, Id, _>(db_ptr, txn, &id, BASE_KEY_METADATA, META_SUB_KEY_ID)?;
if check_id.is_some() {
return Err(ErrorKind::DoubleIntroduction.into());
}
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_ID,
&introduction.id,
)?;
let participants: Vec<_> = if let Id::Contract { .. } = &id {
introduction
.participants
.into_iter()
.map(|mut p| {
p.add_alias_to_roles();
p
})
.collect()
} else {
introduction.participants
};
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_PARTICIPANTS,
&participants,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_SINKS,
&introduction.sinks,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_DESC,
&introduction.desc,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_META,
&introduction.meta,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_INIT_STATE,
&introduction.initial_state,
)?;
let (pkg_def, pkg_source) = introduction.package.into_def_and_source();
let pkg_source = pkg_source.flatten();
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_PACKAGE_DEF,
&pkg_def,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&id,
BASE_KEY_METADATA,
META_SUB_KEY_PACKAGE_SOURCE,
&pkg_source,
)?;
Ok(())
}
#[cfg(any(feature = "contracts", feature = "agents"))]
pub(crate) fn write_revocation<S: Db>(
db_ptr: &S::Handle,
txn: &mut <S as Db>::RwTx<'_>,
revocation: &Revocation,
timestamp: u64,
tx_ctx: Option<TxCtx>,
) -> Result<()> {
let cid = revocation.id;
let meta: Option<Metadata> =
read_system_value::<S, _, _>(db_ptr, txn, &cid, BASE_KEY_METADATA, META_SUB_KEY_META)?;
let mut meta = meta.unwrap();
meta.inactive_since = timestamp;
meta.tx_ctx_revocation = tx_ctx;
write_system_value::<S, _, _>(
db_ptr,
txn,
&cid,
BASE_KEY_METADATA,
META_SUB_KEY_META,
&meta,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&cid,
BASE_KEY_METADATA,
META_SUB_KEY_REVOKED_TS,
×tamp,
)?;
write_system_value::<S, _, _>(
db_ptr,
txn,
&cid,
BASE_KEY_METADATA,
META_SUB_KEY_REVOCATION,
&revocation.reason, )?;
Ok(())
}