1#[cfg(feature = "contracts")]
2pub mod contract;
3
4#[cfg(feature = "agents")]
5pub mod agent;
6
7#[cfg(any(feature = "contracts", feature = "agents"))]
8mod vm;
9
10#[cfg(any(feature = "contracts", feature = "agents"))]
11pub use code_store::CodeStore;
12
13#[cfg(any(feature = "contracts", feature = "agents"))]
14pub mod factory {
15 use std::num::NonZeroUsize;
16
17 use super::CodeStore;
18 use crate::{AgentLock, AgentRuntime, ContractLock, ContractRuntime, Result};
19 use borderless_kv_store::Db;
20
21 pub struct RtFactory<'a, S: Db> {
27 code_store: Option<CodeStore<S>>,
28 #[cfg(feature = "contracts")]
29 lck_contract: Option<ContractLock>,
30 #[cfg(feature = "agents")]
31 lck_agent: Option<AgentLock>,
32 db: &'a S,
33 }
34
35 impl<'a, S: Db> RtFactory<'a, S> {
36 pub fn new(db: &'a S) -> Self {
38 Self {
39 code_store: None,
40 #[cfg(feature = "agents")]
41 lck_agent: None,
42 #[cfg(feature = "contracts")]
43 lck_contract: None,
44 db,
45 }
46 }
47
48 pub fn with_store(db: &'a S, code_store: CodeStore<S>) -> Self {
50 Self {
51 code_store: Some(code_store),
52 #[cfg(feature = "agents")]
53 lck_agent: None,
54 #[cfg(feature = "contracts")]
55 lck_contract: None,
56 db,
57 }
58 }
59
60 pub fn set_cache_size(&mut self, cache_size: NonZeroUsize) -> Result<()> {
66 match &mut self.code_store {
67 Some(_cache) => {
68 panic!("cannot initialize cache twice")
69 }
70 None => {
71 self.code_store = Some(CodeStore::with_cache_size(self.db, cache_size)?);
72 }
73 }
74 Ok(())
75 }
76
77 #[cfg(feature = "contracts")]
79 pub fn spawn_contract_rt(&mut self) -> Result<ContractRuntime<S>> {
80 if self.code_store.is_none() {
81 self.code_store = Some(CodeStore::new(self.db)?);
82 }
83 let code_store = self.code_store.as_ref().unwrap();
84
85 if self.lck_contract.is_none() {
86 self.lck_contract = Some(ContractLock::default());
87 }
88 let lock = self.lck_contract.as_ref().unwrap();
89
90 ContractRuntime::new(self.db, code_store.clone(), lock.clone())
91 }
92
93 #[cfg(feature = "agents")]
95 pub fn spawn_agent_rt(&mut self) -> Result<AgentRuntime<S>> {
96 if self.code_store.is_none() {
97 self.code_store = Some(CodeStore::new(self.db)?);
98 }
99 let code_store = self.code_store.as_ref().unwrap();
100
101 if self.lck_agent.is_none() {
102 self.lck_agent = Some(AgentLock::default());
103 }
104 let lock = self.lck_agent.as_ref().unwrap();
105
106 AgentRuntime::new(self.db, code_store.clone(), lock.clone())
107 }
108 }
109}
110
111#[cfg(feature = "code-store")]
112pub mod code_store {
113 use super::vm::VmState;
114 use borderless::{aid_prefix, cid_prefix, AgentId, ContractId};
115 use borderless_kv_store::{Db, RawRead, RawWrite, Tx};
116 use lru::LruCache;
117 use parking_lot::Mutex;
118 use std::time::Instant;
119 use std::{num::NonZeroUsize, sync::Arc};
120 use wasmtime::{Engine, Instance, Linker, Module, Store};
121
122 use crate::{log_shim::*, AGENT_SUB_DB, CONTRACT_SUB_DB};
123 use crate::{Result, WASM_CODE_SUB_DB};
124
125 type Id = [u8; 16];
127
128 #[derive(Clone)]
130 pub struct CodeStore<S: Db> {
131 db: S,
132 cache: Arc<Mutex<LruCache<Id, Module, ahash::RandomState>>>,
133 }
134
135 impl<S: Db> CodeStore<S> {
136 pub fn new(db: &S) -> Result<Self> {
137 Self::with_cache_size(db, NonZeroUsize::new(16).unwrap())
138 }
139
140 pub fn with_cache_size(db: &S, cache_size: NonZeroUsize) -> Result<Self> {
141 let _db_ptr = db.create_sub_db(WASM_CODE_SUB_DB)?;
142 let cache = LruCache::with_hasher(cache_size, ahash::RandomState::default());
143 Ok(Self {
144 db: db.clone(),
145 cache: Arc::new(Mutex::new(cache)),
146 })
147 }
148
149 pub fn create_store(&self, engine: &Engine) -> Result<Store<VmState<S>>> {
150 let db_ptr = if engine.is_async() {
153 self.db.open_sub_db(AGENT_SUB_DB)?
154 } else {
155 self.db.open_sub_db(CONTRACT_SUB_DB)?
156 };
157 let state = VmState::new(self.db.clone(), db_ptr);
158 let store = Store::new(engine, state);
159 Ok(store)
160 }
161
162 pub fn insert_contract(&self, cid: ContractId, module: Module) -> Result<()> {
163 let module_bytes = module.serialize()?;
164 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
165 let mut txn = self.db.begin_rw_txn()?;
166 txn.write(&db_ptr, &cid, &module_bytes)?;
167 txn.commit()?;
168 Ok(())
169 }
170
171 pub fn insert_swagent(&self, aid: AgentId, module: Module) -> Result<()> {
172 let module_bytes = module.serialize()?;
173 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
174 let mut txn = self.db.begin_rw_txn()?;
175 txn.write(&db_ptr, &aid, &module_bytes)?;
176 txn.commit()?;
177 Ok(())
178 }
179
180 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid)))]
181 pub fn get_contract(
182 &mut self,
183 cid: &ContractId,
184 engine: &Engine,
185 linker: &mut Linker<VmState<S>>,
186 ) -> Result<Option<(Instance, Store<VmState<S>>)>> {
187 let start = Instant::now();
188 let module = match self.read_module(cid.as_bytes(), engine)? {
189 Some(m) => m,
190 None => return Ok(None),
191 };
192 let elapsed = start.elapsed();
193 debug!("Read module in {elapsed:?}");
194 let start = Instant::now();
195 let mut store = self.create_store(engine)?;
196 let instance = linker.instantiate(&mut store, &module)?;
197 let elapsed = start.elapsed();
198 debug!("Instantiated module in {elapsed:?}");
199 Ok(Some((instance, store)))
200 }
201
202 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid)))]
203 pub async fn get_agent(
204 &mut self,
205 aid: &AgentId,
206 engine: &Engine,
207 linker: &mut Linker<VmState<S>>,
208 ) -> Result<Option<(Instance, Store<VmState<S>>)>> {
209 let start = Instant::now();
210 let module = match self.read_module(aid.as_bytes(), engine)? {
211 Some(m) => m,
212 None => return Ok(None),
213 };
214 let elapsed = start.elapsed();
215 debug!("Read module in {elapsed:?}");
216 let start = Instant::now();
217 let mut store = self.create_store(engine)?;
218 let instance = linker.instantiate_async(&mut store, &module).await?;
219 let elapsed = start.elapsed();
220 debug!("Instantiated module in {elapsed:?}");
221 Ok(Some((instance, store)))
222 }
223
224 fn read_module(&mut self, key: &[u8; 16], engine: &Engine) -> Result<Option<Module>> {
230 if let Some(module) = self.cache.lock().get(key.as_ref()) {
231 return Ok(Some(module.clone()));
232 }
233 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
234 let txn = self.db.begin_ro_txn()?;
235 let module_bytes = txn.read(&db_ptr, &key)?;
236 let module = match module_bytes {
237 Some(bytes) => unsafe { Module::deserialize(engine, bytes)? },
238 None => return Ok(None),
239 };
240 txn.commit()?;
241 self.cache.lock().push(*key, module.clone());
243 Ok(Some(module))
244 }
245
246 pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
247 use borderless_kv_store::*;
248
249 let mut out = Vec::new();
250 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
251 let txn = self.db.begin_ro_txn()?;
252 let mut cursor = txn.ro_cursor(&db_ptr)?;
253 for (key, _value) in cursor.iter().filter(|(key, _)| cid_prefix(key)) {
255 let cid =
256 ContractId::from_bytes(key.try_into().map_err(|_| {
257 crate::Error::msg("failed to parse contract-id from storage")
258 })?);
259 out.push(cid);
260 }
261 drop(cursor);
262 txn.commit()?;
263 Ok(out)
264 }
265
266 pub fn available_swagents(&self) -> Result<Vec<AgentId>> {
267 use borderless_kv_store::*;
268
269 let mut out = Vec::new();
270 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
271 let txn = self.db.begin_ro_txn()?;
272 let mut cursor = txn.ro_cursor(&db_ptr)?;
273 for (key, _value) in cursor.iter().filter(|(key, _)| aid_prefix(key)) {
275 let aid = AgentId::from_bytes(
276 key.try_into()
277 .map_err(|_| crate::Error::msg("failed to parse agent-id from storage"))?,
278 );
279 out.push(aid);
280 }
281 drop(cursor);
282 txn.commit()?;
283 Ok(out)
284 }
285
286 pub fn get_db(&self) -> S {
288 self.db.clone()
289 }
290 }
291}