1#[cfg(feature = "contracts")]
2pub mod contract;
3
4#[cfg(feature = "agents")]
5pub mod agent;
6
7#[cfg(any(feature = "contracts", feature = "agents"))]
8mod vm;
9
10#[cfg(any(feature = "contracts", feature = "agents"))]
11pub use code_store::CodeStore;
12
13#[cfg(any(feature = "contracts", feature = "agents"))]
14pub mod factory {
15 use std::num::NonZeroUsize;
16
17 use super::CodeStore;
18 use crate::{AgentLock, AgentRuntime, ContractLock, ContractRuntime, Result};
19 use borderless_kv_store::Db;
20
21 pub struct RtFactory<'a, S: Db> {
27 code_store: Option<CodeStore<S>>,
28 #[cfg(feature = "contracts")]
29 lck_contract: Option<ContractLock>,
30 #[cfg(feature = "agents")]
31 lck_agent: Option<AgentLock>,
32 db: &'a S,
33 }
34
35 impl<'a, S: Db> RtFactory<'a, S> {
36 pub fn new(db: &'a S) -> Self {
38 Self {
39 code_store: None,
40 #[cfg(feature = "agents")]
41 lck_agent: None,
42 #[cfg(feature = "contracts")]
43 lck_contract: None,
44 db,
45 }
46 }
47
48 pub fn with_store(db: &'a S, code_store: CodeStore<S>) -> Self {
50 Self {
51 code_store: Some(code_store),
52 #[cfg(feature = "agents")]
53 lck_agent: None,
54 #[cfg(feature = "contracts")]
55 lck_contract: None,
56 db,
57 }
58 }
59
60 pub fn set_cache_size(&mut self, cache_size: NonZeroUsize) -> Result<()> {
66 match &mut self.code_store {
67 Some(_cache) => {
68 panic!("cannot initialize cache twice")
69 }
70 None => {
71 self.code_store = Some(CodeStore::with_cache_size(self.db, cache_size)?);
72 }
73 }
74 Ok(())
75 }
76
77 #[cfg(feature = "contracts")]
79 pub fn spawn_contract_rt(&mut self) -> Result<ContractRuntime<S>> {
80 if self.code_store.is_none() {
81 self.code_store = Some(CodeStore::new(self.db)?);
82 }
83 let code_store = self.code_store.as_ref().unwrap();
84
85 if self.lck_contract.is_none() {
86 self.lck_contract = Some(ContractLock::default());
87 }
88 let lock = self.lck_contract.as_ref().unwrap();
89
90 Ok(ContractRuntime::new(
91 self.db,
92 code_store.clone(),
93 lock.clone(),
94 )?)
95 }
96
97 #[cfg(feature = "agents")]
99 pub fn spawn_agent_rt(&mut self) -> Result<AgentRuntime<S>> {
100 if self.code_store.is_none() {
101 self.code_store = Some(CodeStore::new(self.db)?);
102 }
103 let code_store = self.code_store.as_ref().unwrap();
104
105 if self.lck_agent.is_none() {
106 self.lck_agent = Some(AgentLock::default());
107 }
108 let lock = self.lck_agent.as_ref().unwrap();
109
110 Ok(AgentRuntime::new(
111 self.db,
112 code_store.clone(),
113 lock.clone(),
114 )?)
115 }
116 }
117}
118
119#[cfg(feature = "code-store")]
120pub mod code_store {
121 use super::vm::VmState;
122 use borderless::{aid_prefix, cid_prefix, AgentId, ContractId};
123 use borderless_kv_store::{Db, RawRead, RawWrite, Tx};
124 use lru::LruCache;
125 use parking_lot::Mutex;
126 use std::{num::NonZeroUsize, sync::Arc};
127 use wasmtime::{Engine, Instance, Linker, Module, Store};
128
129 use crate::log_shim::*;
130 use crate::{Result, WASM_CODE_SUB_DB};
131
132 type Id = [u8; 16];
134
135 #[derive(Clone)]
137 pub struct CodeStore<S: Db> {
138 db: S,
139 cache: Arc<Mutex<LruCache<Id, Instance, ahash::RandomState>>>,
140 }
141
142 impl<S: Db> CodeStore<S> {
143 pub fn new(db: &S) -> Result<Self> {
144 Self::with_cache_size(db, NonZeroUsize::new(16).unwrap())
145 }
146
147 pub fn with_cache_size(db: &S, cache_size: NonZeroUsize) -> Result<Self> {
148 let _db_ptr = db.create_sub_db(WASM_CODE_SUB_DB)?;
149 let cache = LruCache::with_hasher(cache_size, ahash::RandomState::default());
150 Ok(Self {
151 db: db.clone(),
152 cache: Arc::new(Mutex::new(cache)),
153 })
154 }
155
156 pub fn insert_contract(&self, cid: ContractId, module: Module) -> Result<()> {
157 let module_bytes = module.serialize()?;
158 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
159 let mut txn = self.db.begin_rw_txn()?;
160 txn.write(&db_ptr, &cid, &module_bytes)?;
161 txn.commit()?;
162 Ok(())
163 }
164
165 pub fn insert_swagent(&self, aid: AgentId, module: Module) -> Result<()> {
166 let module_bytes = module.serialize()?;
167 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
168 let mut txn = self.db.begin_rw_txn()?;
169 txn.write(&db_ptr, &aid, &module_bytes)?;
170 txn.commit()?;
171 Ok(())
172 }
173
174 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid)))]
175 pub fn get_contract(
176 &mut self,
177 cid: &ContractId,
178 engine: &Engine,
179 store: &mut Store<VmState<S>>,
180 linker: &mut Linker<VmState<S>>,
181 ) -> Result<Option<Instance>> {
182 let start = std::time::Instant::now();
183 if let Some(instance) = self.cache.lock().get(cid.as_bytes()) {
184 let elapsed = start.elapsed();
185 debug!("Served cached module in {elapsed:?}");
186 return Ok(Some(*instance));
187 }
188 let module = match self.read_module(cid, engine)? {
189 Some(m) => m,
190 None => return Ok(None),
191 };
192 let elapsed = start.elapsed();
193 debug!("Read module in {elapsed:?}");
194 let start = std::time::Instant::now();
195 let instance = linker.instantiate(store, &module)?;
196 self.cache.lock().push(*cid.as_bytes(), instance);
197 let elapsed = start.elapsed();
198 debug!("Instantiated module in {elapsed:?}");
199 Ok(Some(instance))
200 }
201
202 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid)))]
203 pub async fn get_agent(
204 &mut self,
205 aid: &AgentId,
206 engine: &Engine,
207 store: &mut Store<VmState<S>>,
208 linker: &mut Linker<VmState<S>>,
209 ) -> Result<Option<Instance>> {
210 let start = std::time::Instant::now();
211 if let Some(instance) = self.cache.lock().get(aid.as_bytes()) {
212 let elapsed = start.elapsed();
213 debug!("Served cached module in {elapsed:?}");
214 return Ok(Some(*instance));
215 }
216 let module = match self.read_module(aid, engine)? {
217 Some(m) => m,
218 None => return Ok(None),
219 };
220 let elapsed = start.elapsed();
221 debug!("Read module in {elapsed:?}");
222 let start = std::time::Instant::now();
223 let instance = linker.instantiate_async(store, &module).await?;
224 self.cache.lock().push(*aid.as_bytes(), instance);
225 let elapsed = start.elapsed();
226 debug!("Instantiated module in {elapsed:?}");
227 Ok(Some(instance))
228 }
229
230 fn read_module(
236 &mut self,
237 key: impl AsRef<[u8]>,
238 engine: &Engine,
239 ) -> Result<Option<Module>> {
240 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
241 let txn = self.db.begin_ro_txn()?;
242 let module_bytes = txn.read(&db_ptr, &key)?;
243 let module = match module_bytes {
244 Some(bytes) => unsafe { Module::deserialize(engine, bytes)? },
245 None => return Ok(None),
246 };
247 txn.commit()?;
248 Ok(Some(module))
249 }
250
251 pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
252 use borderless_kv_store::*;
253
254 let mut out = Vec::new();
255 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
256 let txn = self.db.begin_ro_txn()?;
257 let mut cursor = txn.ro_cursor(&db_ptr)?;
258 for (key, _value) in cursor.iter().filter(|(key, _)| cid_prefix(key)) {
260 let cid =
261 ContractId::from_bytes(key.try_into().map_err(|_| {
262 crate::Error::msg("failed to parse contract-id from storage")
263 })?);
264 out.push(cid);
265 }
266 drop(cursor);
267 txn.commit()?;
268 Ok(out)
269 }
270
271 pub fn available_swagents(&self) -> Result<Vec<AgentId>> {
272 use borderless_kv_store::*;
273
274 let mut out = Vec::new();
275 let db_ptr = self.db.open_sub_db(WASM_CODE_SUB_DB)?;
276 let txn = self.db.begin_ro_txn()?;
277 let mut cursor = txn.ro_cursor(&db_ptr)?;
278 for (key, _value) in cursor.iter().filter(|(key, _)| aid_prefix(key)) {
280 let aid = AgentId::from_bytes(
281 key.try_into()
282 .map_err(|_| crate::Error::msg("failed to parse agent-id from storage"))?,
283 );
284 out.push(aid);
285 }
286 drop(cursor);
287 txn.commit()?;
288 Ok(out)
289 }
290 }
291}