1use super::{
2 action_log::{ActionLog, ActionRecord, RelTxAction},
3 ledger::Ledger,
4 logger::Logger,
5 subscriptions::SubscriptionHandler,
6};
7use crate::log_shim::warn;
8use crate::{Result, ACTION_TX_REL_SUB_DB, AGENT_SUB_DB, CONTRACT_SUB_DB};
9use borderless::common::Participant;
10use borderless::events::{Events, Topic};
11use borderless::{
12 common::{Description, Metadata, Revocation},
13 contracts::Info,
14 ContractId,
15 __private::storage_keys::*,
16 events::Sink,
17 hash::Hash256,
18 http::{AgentInfo, ContractInfo},
19 pkg::{Source, SourceFlattened, WasmPkg, WasmPkgNoSource},
20 prelude::{Id, TxCtx},
21 AgentId, TxIdentifier,
22};
23use borderless_kv_store::*;
24use serde::de::DeserializeOwned;
25
26pub struct Controller<'a, S: Db> {
28 db: &'a S,
29}
30
31impl<'a, S: Db> Controller<'a, S> {
32 pub fn new(db: &'a S) -> Self {
33 Self { db }
34 }
35
36 pub fn actions(&self, cid: ContractId) -> ActionLog<'a, S> {
38 ActionLog::new(self.db, cid)
39 }
40
41 pub fn logs(&self, id: impl Into<Id>) -> Logger<'a, S> {
43 Logger::new(self.db, id)
44 }
45
46 pub fn ledger(&self) -> Ledger<'a, S> {
48 Ledger::new(self.db)
49 }
50
51 pub fn messages(&self) -> SubscriptionHandler<'a, S> {
53 SubscriptionHandler::new(self.db)
54 }
55
56 pub fn contract_participants(&self, cid: &ContractId) -> Result<Option<Vec<Participant>>> {
58 self.read_value(
59 &Id::contract(*cid),
60 BASE_KEY_METADATA,
61 META_SUB_KEY_PARTICIPANTS,
62 )
63 }
64
65 pub fn contract_exists(&self, cid: &ContractId) -> Result<bool> {
67 Ok(self
68 .read_value::<ContractId>(&Id::contract(*cid), BASE_KEY_METADATA, META_SUB_KEY_ID)?
69 .is_some())
70 }
71
72 pub fn agent_exists(&self, aid: &AgentId) -> Result<bool> {
74 Ok(self
75 .read_value::<AgentId>(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_ID)?
76 .is_some())
77 }
78
79 pub fn contract_revoked(&self, cid: &ContractId) -> Result<bool> {
81 Ok(self.contract_revoked_ts(cid)?.is_some())
82 }
83
84 pub fn agent_revoked(&self, aid: &AgentId) -> Result<bool> {
86 Ok(self.agent_revoked_ts(aid)?.is_some())
87 }
88
89 pub fn contract_revoked_ts(&self, cid: &ContractId) -> Result<Option<u64>> {
91 self.read_value::<u64>(
92 &Id::contract(*cid),
93 BASE_KEY_METADATA,
94 META_SUB_KEY_REVOKED_TS,
95 )
96 }
97
98 pub fn agent_revoked_ts(&self, aid: &AgentId) -> Result<Option<u64>> {
100 self.read_value::<u64>(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_REVOKED_TS)
101 }
102
103 pub fn contract_last_tx_hash(&self, cid: &ContractId) -> Result<Option<Hash256>> {
105 let actions = ActionLog::new(self.db, *cid);
106 if let Some(action) = actions.last()? {
107 return Ok(Some(action.tx_ctx.tx_id.hash));
108 }
109 match self.contract_meta(cid)? {
112 Some(meta) => Ok(meta.tx_ctx_introduction.map(|t| t.tx_id.hash)),
113 None => Ok(None),
114 }
115 }
116
117 pub fn contract_info(&self, cid: &ContractId) -> Result<Option<Info>> {
119 let id = Id::contract(*cid);
120 let participants = self.read_value(&id, BASE_KEY_METADATA, META_SUB_KEY_PARTICIPANTS)?;
121 let sinks = self.read_value(&id, BASE_KEY_METADATA, META_SUB_KEY_SINKS)?;
122 match (participants, sinks) {
123 (Some(participants), Some(sinks)) => Ok(Some(Info {
124 contract_id: *cid,
125 participants,
126 sinks,
127 })),
128 _ => Ok(None),
129 }
130 }
131
132 pub fn agent_sinks(&self, aid: &AgentId) -> Result<Option<Vec<Sink>>> {
137 let aid = Id::agent(*aid);
138 self.read_value(&aid, BASE_KEY_METADATA, META_SUB_KEY_SINKS)
139 }
140
141 pub fn agent_subs(&self, aid: &AgentId) -> Result<Vec<Topic>> {
142 Ok(self.messages().get_subscriptions(*aid)?)
143 }
144
145 pub fn contract_desc(&self, cid: &ContractId) -> Result<Option<Description>> {
147 self.read_value(&Id::contract(*cid), BASE_KEY_METADATA, META_SUB_KEY_DESC)
148 }
149
150 pub fn agent_desc(&self, aid: &AgentId) -> Result<Option<Description>> {
152 self.read_value(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_DESC)
153 }
154
155 pub fn contract_meta(&self, cid: &ContractId) -> Result<Option<Metadata>> {
157 self.read_value(&Id::contract(*cid), BASE_KEY_METADATA, META_SUB_KEY_META)
158 }
159
160 pub fn agent_meta(&self, aid: &AgentId) -> Result<Option<Metadata>> {
162 self.read_value(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_META)
163 }
164
165 pub fn contract_full(&self, cid: &ContractId) -> Result<Option<ContractInfo>> {
167 let info = self.contract_info(cid)?;
168 let desc = self.contract_desc(cid)?;
169 let meta = self.contract_meta(cid)?;
170 Ok(Some(ContractInfo { info, desc, meta }))
171 }
172
173 pub fn agent_full(&self, aid: &AgentId) -> Result<Option<AgentInfo>> {
175 let sinks = self.agent_sinks(aid)?.unwrap_or_default();
176 let subs = self.agent_subs(aid)?;
177 let desc = self.agent_desc(aid)?;
178 let meta = self.agent_meta(aid)?;
179 Ok(Some(AgentInfo {
180 agent_id: *aid,
181 sinks,
182 subs,
183 desc,
184 meta,
185 }))
186 }
187
188 pub fn contract_revocation(&self, cid: &ContractId) -> Result<Option<Revocation>> {
190 self.read_value(
191 &Id::contract(*cid),
192 BASE_KEY_METADATA,
193 META_SUB_KEY_REVOCATION,
194 )
195 }
196
197 pub fn agent_revocation(&self, aid: &AgentId) -> Result<Option<Revocation>> {
199 self.read_value(&Id::agent(*aid), BASE_KEY_METADATA, META_SUB_KEY_REVOCATION)
200 }
201
202 pub fn query_action(&self, tx_id: &TxIdentifier) -> Result<Option<ActionRecord>> {
204 let tx_id_bytes = tx_id.to_bytes();
205 let relation = {
206 let rel_db = self.db.create_sub_db(ACTION_TX_REL_SUB_DB)?;
207 let txn = self.db.begin_ro_txn()?;
208 match txn.read(&rel_db, &tx_id_bytes)? {
209 Some(bytes) => RelTxAction::from_bytes(bytes),
210 None => return Ok(None),
211 }
212 };
213 match self
215 .actions(relation.cid)
216 .get(relation.action_idx as usize)?
217 {
218 Some(record) => {
219 debug_assert!(record.tx_ctx.tx_id == *tx_id, "tx-id must match");
220 Ok(Some(record))
221 }
222 None => Ok(None),
223 }
224 }
225
226 pub fn agent_pkg_def(&self, aid: &AgentId) -> Result<Option<WasmPkgNoSource>> {
228 self.read_value(
229 &Id::agent(*aid),
230 BASE_KEY_METADATA,
231 META_SUB_KEY_PACKAGE_DEF,
232 )
233 }
234
235 pub fn agent_pkg_source(&self, aid: &AgentId) -> Result<Option<Source>> {
237 let source: Option<SourceFlattened> = self.read_value(
238 &Id::agent(*aid),
239 BASE_KEY_METADATA,
240 META_SUB_KEY_PACKAGE_SOURCE,
241 )?;
242 Ok(source.map(|s| s.unflatten()))
243 }
244
245 pub fn agent_pkg_full(&self, aid: &AgentId) -> Result<Option<WasmPkg>> {
247 let pkg_def = self.agent_pkg_def(aid)?;
248 let source = self.agent_pkg_source(aid)?;
249 match (pkg_def, source) {
250 (Some(pkg), Some(source)) => Ok(Some(WasmPkg::from_def_and_source(pkg, source))),
251 _ => Ok(None),
252 }
253 }
254
255 pub fn contract_pkg_def(&self, aid: &ContractId) -> Result<Option<WasmPkgNoSource>> {
257 self.read_value(
258 &Id::contract(*aid),
259 BASE_KEY_METADATA,
260 META_SUB_KEY_PACKAGE_DEF,
261 )
262 }
263
264 pub fn contract_pkg_source(&self, aid: &ContractId) -> Result<Option<Source>> {
266 let source: Option<SourceFlattened> = self.read_value(
268 &Id::contract(*aid),
269 BASE_KEY_METADATA,
270 META_SUB_KEY_PACKAGE_SOURCE,
271 )?;
272 Ok(source.map(|s| s.unflatten()))
273 }
274
275 pub fn contract_pkg_full(&self, aid: &ContractId) -> Result<Option<WasmPkg>> {
277 let pkg_def = self.contract_pkg_def(aid)?;
278 let source = self.contract_pkg_source(aid)?;
279 match (pkg_def, source) {
280 (Some(pkg), Some(source)) => Ok(Some(WasmPkg::from_def_and_source(pkg, source))),
281 _ => Ok(None),
282 }
283 }
284
285 fn read_value<D: DeserializeOwned>(
286 &self,
287 id: &Id,
288 base_key: u64,
289 sub_key: u64,
290 ) -> Result<Option<D>> {
291 let db_ptr = match id {
293 Id::Contract { .. } => self.db.open_sub_db(CONTRACT_SUB_DB)?,
294 Id::Agent { .. } => self.db.open_sub_db(AGENT_SUB_DB)?,
295 };
296 let txn = self.db.begin_ro_txn()?;
297 let key = StorageKey::system_key(id, base_key, sub_key);
298 let bytes = txn.read(&db_ptr, &key)?;
299 let result = match bytes {
300 Some(val) => Some(postcard::from_bytes(val)?),
301 None => None,
302 };
303 txn.commit()?;
304 Ok(result)
305 }
306
307 pub fn filter_events(&self, events: Events) -> Result<Events> {
316 let mut filtered = Events::default();
317 for cc in events.contracts {
319 let cid = cc.contract_id;
320 if !self.contract_exists(&cid)? {
322 warn!("ContractCall contains a non-existing contract-id");
323 continue;
324 }
325 if self.contract_revoked(&cid)? {
327 warn!("ContractCall contains a revoked contract-id");
328 continue;
329 }
330 filtered.contracts.push(cc);
331 }
332 let sub_handler = self.messages();
334 for msg in events.local {
335 if sub_handler
336 .get_topic_subscribers(msg.publisher, msg.topic.to_string())?
337 .is_empty()
338 {
339 warn!("Message contains topic with no subscribers");
340 continue;
341 }
342 filtered.local.push(msg);
343 }
344
345 Ok(filtered)
346 }
347}
348
349#[cfg(any(feature = "contracts", feature = "agents"))]
351pub(crate) fn write_system_value<S: Db, D: serde::Serialize, ID: AsRef<[u8; 16]>>(
352 db_ptr: &S::Handle,
353 txn: &mut <S as Db>::RwTx<'_>,
354 uid: ID,
355 base_key: u64,
356 sub_key: u64,
357 data: &D,
358) -> Result<()> {
359 let key = StorageKey::system_key(uid, base_key, sub_key);
360 let bytes = postcard::to_allocvec(data)?;
361 txn.write(db_ptr, &key, &bytes)?;
362 Ok(())
363}
364
365#[cfg(any(feature = "contracts", feature = "agents"))]
367pub(crate) fn read_system_value<S: Db, D: DeserializeOwned, ID: AsRef<[u8; 16]>>(
368 db_ptr: &S::Handle,
369 txn: &<S as Db>::RwTx<'_>,
370 cid: ID,
371 base_key: u64,
372 sub_key: u64,
373) -> Result<Option<D>> {
374 let key = StorageKey::system_key(cid, base_key, sub_key);
375 let bytes = txn.read(db_ptr, &key)?;
376 match bytes {
377 Some(val) => {
378 let out = postcard::from_bytes(val)?;
379 Ok(Some(out))
380 }
381 None => Ok(None),
382 }
383}
384
385#[cfg(any(feature = "contracts", feature = "agents"))]
386pub(crate) fn write_introduction<S: Db>(
387 db_ptr: &S::Handle,
388 txn: &mut <S as Db>::RwTx<'_>,
389 introduction: borderless::common::Introduction,
390) -> Result<()> {
391 use borderless::__private::storage_keys::*;
392
393 use crate::error::ErrorKind;
394 let id = introduction.id;
395
396 let check_id =
399 read_system_value::<S, Id, _>(db_ptr, txn, &id, BASE_KEY_METADATA, META_SUB_KEY_ID)?;
400 if check_id.is_some() {
401 return Err(ErrorKind::DoubleIntroduction.into());
402 }
403
404 write_system_value::<S, _, _>(
406 db_ptr,
407 txn,
408 &id,
409 BASE_KEY_METADATA,
410 META_SUB_KEY_ID,
411 &introduction.id,
412 )?;
413
414 let participants: Vec<_> = if let Id::Contract { .. } = &id {
416 introduction
418 .participants
419 .into_iter()
420 .map(|mut p| {
421 p.add_alias_to_roles();
422 p
423 })
424 .collect()
425 } else {
426 introduction.participants
427 };
428 write_system_value::<S, _, _>(
430 db_ptr,
431 txn,
432 &id,
433 BASE_KEY_METADATA,
434 META_SUB_KEY_PARTICIPANTS,
435 &participants,
436 )?;
437
438 write_system_value::<S, _, _>(
440 db_ptr,
441 txn,
442 &id,
443 BASE_KEY_METADATA,
444 META_SUB_KEY_SINKS,
445 &introduction.sinks,
446 )?;
447
448 write_system_value::<S, _, _>(
450 db_ptr,
451 txn,
452 &id,
453 BASE_KEY_METADATA,
454 META_SUB_KEY_DESC,
455 &introduction.desc,
456 )?;
457
458 write_system_value::<S, _, _>(
460 db_ptr,
461 txn,
462 &id,
463 BASE_KEY_METADATA,
464 META_SUB_KEY_META,
465 &introduction.meta,
466 )?;
467
468 write_system_value::<S, _, _>(
470 db_ptr,
471 txn,
472 &id,
473 BASE_KEY_METADATA,
474 META_SUB_KEY_INIT_STATE,
475 &introduction.initial_state,
476 )?;
477
478 let (pkg_def, pkg_source) = introduction.package.into_def_and_source();
480 let pkg_source = pkg_source.flatten();
481
482 write_system_value::<S, _, _>(
484 db_ptr,
485 txn,
486 &id,
487 BASE_KEY_METADATA,
488 META_SUB_KEY_PACKAGE_DEF,
489 &pkg_def,
490 )?;
491
492 write_system_value::<S, _, _>(
494 db_ptr,
495 txn,
496 &id,
497 BASE_KEY_METADATA,
498 META_SUB_KEY_PACKAGE_SOURCE,
499 &pkg_source,
500 )?;
501
502 Ok(())
503}
504
505#[cfg(any(feature = "contracts", feature = "agents"))]
506pub(crate) fn write_revocation<S: Db>(
507 db_ptr: &S::Handle,
508 txn: &mut <S as Db>::RwTx<'_>,
509 revocation: &Revocation,
510 timestamp: u64,
511 tx_ctx: Option<TxCtx>,
512) -> Result<()> {
513 let cid = revocation.id;
514 let meta: Option<Metadata> =
516 read_system_value::<S, _, _>(db_ptr, txn, &cid, BASE_KEY_METADATA, META_SUB_KEY_META)?;
517 let mut meta = meta.unwrap();
518
519 meta.inactive_since = timestamp;
520 meta.tx_ctx_revocation = tx_ctx;
521
522 write_system_value::<S, _, _>(
523 db_ptr,
524 txn,
525 &cid,
526 BASE_KEY_METADATA,
527 META_SUB_KEY_META,
528 &meta,
529 )?;
530 write_system_value::<S, _, _>(
531 db_ptr,
532 txn,
533 &cid,
534 BASE_KEY_METADATA,
535 META_SUB_KEY_REVOKED_TS,
536 ×tamp,
537 )?;
538 write_system_value::<S, _, _>(
539 db_ptr,
540 txn,
541 &cid,
542 BASE_KEY_METADATA,
543 META_SUB_KEY_REVOCATION,
544 &revocation.reason, )?;
546 Ok(())
547}