borderless_runtime/
rt.rs

1#[cfg(feature = "contracts")]
2pub mod contract;
3
4#[cfg(feature = "agents")]
5pub mod agent;
6
7#[cfg(any(feature = "contracts", feature = "agents"))]
8mod vm;
9
10#[cfg(any(feature = "contracts", feature = "agents"))]
11pub use code_store::CodeStore;
12
13#[cfg(any(feature = "contracts", feature = "agents"))]
14pub mod factory {
15    use std::num::NonZeroUsize;
16
17    use super::CodeStore;
18    use crate::{AgentLock, AgentRuntime, ContractLock, ContractRuntime, Result};
19    use borderless_kv_store::Db;
20
21    /// Create new runtimes with shared code-store, cache and lock.
22    ///
23    /// This factory is capable of spawning agent runtimes and contract runtimes.
24    /// The agent and contract runtimes spawned by this factory have a shared [`CodeStore`],
25    /// but their individual execution locks (there is one lock for all agents and one lock for all contracts).
26    pub struct RtFactory<'a, S: Db> {
27        code_store: Option<CodeStore<S>>,
28        #[cfg(feature = "contracts")]
29        lck_contract: Option<ContractLock>,
30        #[cfg(feature = "agents")]
31        lck_agent: Option<AgentLock>,
32        db: &'a S,
33    }
34
35    impl<'a, S: Db> RtFactory<'a, S> {
36        /// Creates a new factory
37        pub fn new(db: &'a S) -> Self {
38            Self {
39                code_store: None,
40                #[cfg(feature = "agents")]
41                lck_agent: None,
42                #[cfg(feature = "contracts")]
43                lck_contract: None,
44                db,
45            }
46        }
47
48        /// Creates a new over an existing code store
49        pub fn with_store(db: &'a S, code_store: CodeStore<S>) -> Self {
50            Self {
51                code_store: Some(code_store),
52                #[cfg(feature = "agents")]
53                lck_agent: None,
54                #[cfg(feature = "contracts")]
55                lck_contract: None,
56                db,
57            }
58        }
59
60        /// Sets the cache size and initializes the code-store
61        ///
62        /// # Panic
63        ///
64        /// This function will panic, if the code-store has already been initialized.
65        pub fn set_cache_size(&mut self, cache_size: NonZeroUsize) -> Result<()> {
66            match &mut self.code_store {
67                Some(_cache) => {
68                    panic!("cannot initialize cache twice")
69                }
70                None => {
71                    self.code_store = Some(CodeStore::with_cache_size(self.db, cache_size)?);
72                }
73            }
74            Ok(())
75        }
76
77        /// Creates a new contract runtime
78        #[cfg(feature = "contracts")]
79        pub fn spawn_contract_rt(&mut self) -> Result<ContractRuntime<S>> {
80            if self.code_store.is_none() {
81                self.code_store = Some(CodeStore::new(self.db)?);
82            }
83            let code_store = self.code_store.as_ref().unwrap();
84
85            if self.lck_contract.is_none() {
86                self.lck_contract = Some(ContractLock::default());
87            }
88            let lock = self.lck_contract.as_ref().unwrap();
89
90            Ok(ContractRuntime::new(
91                self.db,
92                code_store.clone(),
93                lock.clone(),
94            )?)
95        }
96
97        /// Creates a new agent runtime
98        #[cfg(feature = "agents")]
99        pub fn spawn_agent_rt(&mut self) -> Result<AgentRuntime<S>> {
100            if self.code_store.is_none() {
101                self.code_store = Some(CodeStore::new(self.db)?);
102            }
103            let code_store = self.code_store.as_ref().unwrap();
104
105            if self.lck_agent.is_none() {
106                self.lck_agent = Some(AgentLock::default());
107            }
108            let lock = self.lck_agent.as_ref().unwrap();
109
110            Ok(AgentRuntime::new(
111                self.db,
112                code_store.clone(),
113                lock.clone(),
114            )?)
115        }
116    }
117}
118
119#[cfg(feature = "code-store")]
120pub mod code_store {
121    use super::vm::VmState;
122    use borderless::{aid_prefix, cid_prefix, AgentId, ContractId};
123    use borderless_kv_store::{Db, RawRead, RawWrite, Tx};
124    use lru::LruCache;
125    use parking_lot::Mutex;
126    use std::{num::NonZeroUsize, sync::Arc};
127    use wasmtime::{Engine, Instance, Linker, Module, Store};
128
129    use crate::log_shim::*;
130    use crate::{Result, WASM_CODE_SUB_DB};
131
132    /// Generalized ID - this is either a Contract-ID or an Agent-ID
133    type Id = [u8; 16];
134
135    /// Storage for our webassembly code
136    #[derive(Clone)]
137    pub struct CodeStore<S: Db> {
138        db: S,
139        cache: Arc<Mutex<LruCache<Id, Instance, ahash::RandomState>>>,
140    }
141
142    impl<S: Db> CodeStore<S> {
143        pub fn new(db: &S) -> Result<Self> {
144            Self::with_cache_size(db, NonZeroUsize::new(16).unwrap())
145        }
146
147        pub fn with_cache_size(db: &S, cache_size: NonZeroUsize) -> Result<Self> {
148            let _db_ptr = db.create_sub_db(WASM_CODE_SUB_DB)?;
149            let cache = LruCache::with_hasher(cache_size, ahash::RandomState::default());
150            Ok(Self {
151                db: db.clone(),
152                cache: Arc::new(Mutex::new(cache)),
153            })
154        }
155
156        pub fn insert_contract(&self, cid: ContractId, module: Module) -> Result<()> {
157            let module_bytes = module.serialize()?;
158            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
159            let mut txn = self.db.begin_rw_txn()?;
160            txn.write(&db_ptr, &cid, &module_bytes)?;
161            txn.commit()?;
162            Ok(())
163        }
164
165        pub fn insert_swagent(&self, aid: AgentId, module: Module) -> Result<()> {
166            let module_bytes = module.serialize()?;
167            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
168            let mut txn = self.db.begin_rw_txn()?;
169            txn.write(&db_ptr, &aid, &module_bytes)?;
170            txn.commit()?;
171            Ok(())
172        }
173
174        #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid)))]
175        pub fn get_contract(
176            &mut self,
177            cid: &ContractId,
178            engine: &Engine,
179            store: &mut Store<VmState<S>>,
180            linker: &mut Linker<VmState<S>>,
181        ) -> Result<Option<Instance>> {
182            let start = std::time::Instant::now();
183            if let Some(instance) = self.cache.lock().get(cid.as_bytes()) {
184                let elapsed = start.elapsed();
185                debug!("Served cached module in {elapsed:?}");
186                return Ok(Some(*instance));
187            }
188            let module = match self.read_module(cid, engine)? {
189                Some(m) => m,
190                None => return Ok(None),
191            };
192            let elapsed = start.elapsed();
193            debug!("Read module in {elapsed:?}");
194            let start = std::time::Instant::now();
195            let instance = linker.instantiate(store, &module)?;
196            self.cache.lock().push(*cid.as_bytes(), instance);
197            let elapsed = start.elapsed();
198            debug!("Instantiated module in {elapsed:?}");
199            Ok(Some(instance))
200        }
201
202        #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid)))]
203        pub async fn get_agent(
204            &mut self,
205            aid: &AgentId,
206            engine: &Engine,
207            store: &mut Store<VmState<S>>,
208            linker: &mut Linker<VmState<S>>,
209        ) -> Result<Option<Instance>> {
210            let start = std::time::Instant::now();
211            if let Some(instance) = self.cache.lock().get(aid.as_bytes()) {
212                let elapsed = start.elapsed();
213                debug!("Served cached module in {elapsed:?}");
214                return Ok(Some(*instance));
215            }
216            let module = match self.read_module(aid, engine)? {
217                Some(m) => m,
218                None => return Ok(None),
219            };
220            let elapsed = start.elapsed();
221            debug!("Read module in {elapsed:?}");
222            let start = std::time::Instant::now();
223            let instance = linker.instantiate_async(store, &module).await?;
224            self.cache.lock().push(*aid.as_bytes(), instance);
225            let elapsed = start.elapsed();
226            debug!("Instantiated module in {elapsed:?}");
227            Ok(Some(instance))
228        }
229
230        /// Helper function to read a module from the kv-storage
231        ///
232        /// Note: This helper function is required, because otherwise the compiler might complain
233        /// that `RoTx` does not implement `Send`, as it cannot figure out on its own,
234        /// that the transaction is dropped before the next `.await` point.
235        fn read_module(
236            &mut self,
237            key: impl AsRef<[u8]>,
238            engine: &Engine,
239        ) -> Result<Option<Module>> {
240            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
241            let txn = self.db.begin_ro_txn()?;
242            let module_bytes = txn.read(&db_ptr, &key)?;
243            let module = match module_bytes {
244                Some(bytes) => unsafe { Module::deserialize(engine, bytes)? },
245                None => return Ok(None),
246            };
247            txn.commit()?;
248            Ok(Some(module))
249        }
250
251        pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
252            use borderless_kv_store::*;
253
254            let mut out = Vec::new();
255            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
256            let txn = self.db.begin_ro_txn()?;
257            let mut cursor = txn.ro_cursor(&db_ptr)?;
258            // NOTE: We have to filter out all keys without the cid prefix
259            for (key, _value) in cursor.iter().filter(|(key, _)| cid_prefix(key)) {
260                let cid =
261                    ContractId::from_bytes(key.try_into().map_err(|_| {
262                        crate::Error::msg("failed to parse contract-id from storage")
263                    })?);
264                out.push(cid);
265            }
266            drop(cursor);
267            txn.commit()?;
268            Ok(out)
269        }
270
271        pub fn available_swagents(&self) -> Result<Vec<AgentId>> {
272            use borderless_kv_store::*;
273
274            let mut out = Vec::new();
275            let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
276            let txn = self.db.begin_ro_txn()?;
277            let mut cursor = txn.ro_cursor(&db_ptr)?;
278            // NOTE: We have to filter out all keys without the aid prefix
279            for (key, _value) in cursor.iter().filter(|(key, _)| aid_prefix(key)) {
280                let aid = AgentId::from_bytes(
281                    key.try_into()
282                        .map_err(|_| crate::Error::msg("failed to parse agent-id from storage"))?,
283                );
284                out.push(aid);
285            }
286            drop(cursor);
287            txn.commit()?;
288            Ok(out)
289        }
290    }
291}