Skip to main content

borderless_runtime/
rt.rs

1#[cfg(feature = "contracts")]
2pub mod contract;
3
4#[cfg(feature = "agents")]
5pub mod agent;
6
7#[cfg(any(feature = "contracts", feature = "agents"))]
8mod vm;
9
10#[cfg(any(feature = "contracts", feature = "agents"))]
11pub use code_store::CodeStore;
12
13#[cfg(any(feature = "contracts", feature = "agents"))]
14pub mod factory {
15    use std::num::NonZeroUsize;
16
17    use super::CodeStore;
18    use crate::{AgentLock, AgentRuntime, ContractLock, ContractRuntime, Result};
19    use borderless_kv_store::Db;
20
21    /// Create new runtimes with shared code-store, cache and lock.
22    ///
23    /// This factory is capable of spawning agent runtimes and contract runtimes.
24    /// The agent and contract runtimes spawned by this factory have a shared [`CodeStore`],
25    /// but their individual execution locks (there is one lock for all agents and one lock for all contracts).
26    pub struct RtFactory<'a, S: Db> {
27        code_store: Option<CodeStore<S>>,
28        #[cfg(feature = "contracts")]
29        lck_contract: Option<ContractLock>,
30        #[cfg(feature = "agents")]
31        lck_agent: Option<AgentLock>,
32        db: &'a S,
33    }
34
35    impl<'a, S: Db> RtFactory<'a, S> {
36        /// Creates a new factory
37        pub fn new(db: &'a S) -> Self {
38            Self {
39                code_store: None,
40                #[cfg(feature = "agents")]
41                lck_agent: None,
42                #[cfg(feature = "contracts")]
43                lck_contract: None,
44                db,
45            }
46        }
47
48        /// Creates a new over an existing code store
49        pub fn with_store(db: &'a S, code_store: CodeStore<S>) -> Self {
50            Self {
51                code_store: Some(code_store),
52                #[cfg(feature = "agents")]
53                lck_agent: None,
54                #[cfg(feature = "contracts")]
55                lck_contract: None,
56                db,
57            }
58        }
59
60        /// Sets the cache size and initializes the code-store
61        ///
62        /// # Panic
63        ///
64        /// This function will panic, if the code-store has already been initialized.
65        pub fn set_cache_size(&mut self, cache_size: NonZeroUsize) -> Result<()> {
66            match &mut self.code_store {
67                Some(_cache) => {
68                    panic!("cannot initialize cache twice")
69                }
70                None => {
71                    self.code_store = Some(CodeStore::with_cache_size(self.db, cache_size)?);
72                }
73            }
74            Ok(())
75        }
76
77        /// Creates a new contract runtime
78        #[cfg(feature = "contracts")]
79        pub fn spawn_contract_rt(&mut self) -> Result<ContractRuntime<S>> {
80            if self.code_store.is_none() {
81                self.code_store = Some(CodeStore::new(self.db)?);
82            }
83            let code_store = self.code_store.as_ref().unwrap();
84
85            if self.lck_contract.is_none() {
86                self.lck_contract = Some(ContractLock::default());
87            }
88            let lock = self.lck_contract.as_ref().unwrap();
89
90            ContractRuntime::new(self.db, code_store.clone(), lock.clone())
91        }
92
93        /// Creates a new agent runtime
94        #[cfg(feature = "agents")]
95        pub fn spawn_agent_rt(&mut self) -> Result<AgentRuntime<S>> {
96            if self.code_store.is_none() {
97                self.code_store = Some(CodeStore::new(self.db)?);
98            }
99            let code_store = self.code_store.as_ref().unwrap();
100
101            if self.lck_agent.is_none() {
102                self.lck_agent = Some(AgentLock::default());
103            }
104            let lock = self.lck_agent.as_ref().unwrap();
105
106            AgentRuntime::new(self.db, code_store.clone(), lock.clone())
107        }
108    }
109}
110
111#[cfg(feature = "code-store")]
112pub mod code_store {
113    use super::vm::VmState;
114    use borderless::{aid_prefix, cid_prefix, AgentId, ContractId};
115    use borderless_kv_store::{Db, RawRead, RawWrite, Tx};
116    use lru::LruCache;
117    use parking_lot::Mutex;
118    use std::time::Instant;
119    use std::{num::NonZeroUsize, sync::Arc};
120    use wasmtime::{Engine, Instance, Linker, Module, Store};
121
122    use crate::{log_shim::*, AGENT_SUB_DB, CONTRACT_SUB_DB};
123    use crate::{Result, WASM_CODE_SUB_DB};
124
125    /// Generalized ID - this is either a Contract-ID or an Agent-ID
126    type Id = [u8; 16];
127
128    /// Storage for our webassembly code
129    #[derive(Clone)]
130    pub struct CodeStore<S: Db> {
131        db: S,
132        cache: Arc<Mutex<LruCache<Id, Module, ahash::RandomState>>>,
133    }
134
135    impl<S: Db> CodeStore<S> {
136        pub fn new(db: &S) -> Result<Self> {
137            Self::with_cache_size(db, NonZeroUsize::new(16).unwrap())
138        }
139
140        pub fn with_cache_size(db: &S, cache_size: NonZeroUsize) -> Result<Self> {
141            let _db_ptr = db.create_sub_db(WASM_CODE_SUB_DB)?;
142            let cache = LruCache::with_hasher(cache_size, ahash::RandomState::default());
143            Ok(Self {
144                db: db.clone(),
145                cache: Arc::new(Mutex::new(cache)),
146            })
147        }
148
149        pub fn create_store(&self, engine: &Engine) -> Result<Store<VmState<S>>> {
150            // TODO: Select correct sub-db based on entity type
151            // ( do we want to use the engine here ? )
152            let db_ptr = if engine.is_async() {
153                self.db.open_sub_db(AGENT_SUB_DB)?
154            } else {
155                self.db.open_sub_db(CONTRACT_SUB_DB)?
156            };
157            let state = VmState::new(self.db.clone(), db_ptr);
158            let store = Store::new(engine, state);
159            Ok(store)
160        }
161
162        pub fn insert_contract(&self, cid: ContractId, module: Module) -> Result<()> {
163            let module_bytes = module.serialize()?;
164            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
165            let mut txn = self.db.begin_rw_txn()?;
166            txn.write(&db_ptr, &cid, &module_bytes)?;
167            txn.commit()?;
168            Ok(())
169        }
170
171        pub fn insert_swagent(&self, aid: AgentId, module: Module) -> Result<()> {
172            let module_bytes = module.serialize()?;
173            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
174            let mut txn = self.db.begin_rw_txn()?;
175            txn.write(&db_ptr, &aid, &module_bytes)?;
176            txn.commit()?;
177            Ok(())
178        }
179
180        #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid)))]
181        pub fn get_contract(
182            &mut self,
183            cid: &ContractId,
184            engine: &Engine,
185            linker: &mut Linker<VmState<S>>,
186        ) -> Result<Option<(Instance, Store<VmState<S>>)>> {
187            let start = Instant::now();
188            let module = match self.read_module(cid.as_bytes(), engine)? {
189                Some(m) => m,
190                None => return Ok(None),
191            };
192            let elapsed = start.elapsed();
193            debug!("Read module in {elapsed:?}");
194            let start = Instant::now();
195            let mut store = self.create_store(engine)?;
196            let instance = linker.instantiate(&mut store, &module)?;
197            let elapsed = start.elapsed();
198            debug!("Instantiated module in {elapsed:?}");
199            Ok(Some((instance, store)))
200        }
201
202        #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid)))]
203        pub async fn get_agent(
204            &mut self,
205            aid: &AgentId,
206            engine: &Engine,
207            linker: &mut Linker<VmState<S>>,
208        ) -> Result<Option<(Instance, Store<VmState<S>>)>> {
209            let start = Instant::now();
210            let module = match self.read_module(aid.as_bytes(), engine)? {
211                Some(m) => m,
212                None => return Ok(None),
213            };
214            let elapsed = start.elapsed();
215            debug!("Read module in {elapsed:?}");
216            let start = Instant::now();
217            let mut store = self.create_store(engine)?;
218            let instance = linker.instantiate_async(&mut store, &module).await?;
219            let elapsed = start.elapsed();
220            debug!("Instantiated module in {elapsed:?}");
221            Ok(Some((instance, store)))
222        }
223
224        /// Helper function to read a module from the kv-storage
225        ///
226        /// Note: This helper function is required, because otherwise the compiler might complain
227        /// that `RoTx` does not implement `Send`, as it cannot figure out on its own,
228        /// that the transaction is dropped before the next `.await` point.
229        fn read_module(&mut self, key: &[u8; 16], engine: &Engine) -> Result<Option<Module>> {
230            if let Some(module) = self.cache.lock().get(key.as_ref()) {
231                return Ok(Some(module.clone()));
232            }
233            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
234            let txn = self.db.begin_ro_txn()?;
235            let module_bytes = txn.read(&db_ptr, &key)?;
236            let module = match module_bytes {
237                Some(bytes) => unsafe { Module::deserialize(engine, bytes)? },
238                None => return Ok(None),
239            };
240            txn.commit()?;
241            // Insert module into cache
242            self.cache.lock().push(*key, module.clone());
243            Ok(Some(module))
244        }
245
246        pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
247            use borderless_kv_store::*;
248
249            let mut out = Vec::new();
250            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
251            let txn = self.db.begin_ro_txn()?;
252            let mut cursor = txn.ro_cursor(&db_ptr)?;
253            // NOTE: We have to filter out all keys without the cid prefix
254            for (key, _value) in cursor.iter().filter(|(key, _)| cid_prefix(key)) {
255                let cid =
256                    ContractId::from_bytes(key.try_into().map_err(|_| {
257                        crate::Error::msg("failed to parse contract-id from storage")
258                    })?);
259                out.push(cid);
260            }
261            drop(cursor);
262            txn.commit()?;
263            Ok(out)
264        }
265
266        pub fn available_swagents(&self) -> Result<Vec<AgentId>> {
267            use borderless_kv_store::*;
268
269            let mut out = Vec::new();
270            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
271            let txn = self.db.begin_ro_txn()?;
272            let mut cursor = txn.ro_cursor(&db_ptr)?;
273            // NOTE: We have to filter out all keys without the aid prefix
274            for (key, _value) in cursor.iter().filter(|(key, _)| aid_prefix(key)) {
275                let aid = AgentId::from_bytes(
276                    key.try_into()
277                        .map_err(|_| crate::Error::msg("failed to parse agent-id from storage"))?,
278                );
279                out.push(aid);
280            }
281            drop(cursor);
282            txn.commit()?;
283            Ok(out)
284        }
285
286        /// Returns a copy of the underlying db-handle
287        pub fn get_db(&self) -> S {
288            self.db.clone()
289        }
290    }
291}