1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::agents::Init;
7use borderless::common::{Introduction, Revocation, Symbols};
8use borderless::events::Events;
9use borderless::{events::CallAction, AgentId, BorderlessId};
10use borderless_kv_store::backend::lmdb::Lmdb;
11use borderless_kv_store::Db;
12use parking_lot::Mutex as SyncMutex;
13use tokio::sync::{mpsc, Mutex};
14use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module, Store};
15
16use super::vm::AgentCommit;
17use super::{
18 code_store::CodeStore,
19 vm::{self, VmState},
20};
21use crate::log_shim::*;
22use crate::{
23 db::logger,
24 error::{ErrorKind, Result},
25 AGENT_SUB_DB,
26};
27
28pub mod tasks;
29
30pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
31
32pub struct Runtime<S = Lmdb>
33where
34 S: Db,
35{
36 linker: Linker<VmState<S>>,
37 store: Store<VmState<S>>,
38 engine: Engine,
39 agent_store: CodeStore<S>,
40 mutability_lock: MutLock,
41}
42
43impl<S: Db> Runtime<S> {
44 pub fn new(storage: &S, agent_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
45 let db_ptr = storage.create_sub_db(AGENT_SUB_DB)?;
46 let start = Instant::now();
47 let state = VmState::new_async(storage.clone(), db_ptr);
48
49 let mut config = Config::new();
50 config.cranelift_opt_level(wasmtime::OptLevel::Speed);
51 config.async_support(true); let engine = Engine::new(&config)?;
53 let mut linker: Linker<VmState<S>> = Linker::new(&engine);
56
57 linker.func_wrap(
60 "env",
61 "print",
62 |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
63 )?;
64 linker.func_wrap(
65 "env",
66 "read_register",
67 |caller: Caller<'_, VmState<S>>, register_id, ptr| {
68 vm::read_register(caller, register_id, ptr)
69 },
70 )?;
71 linker.func_wrap(
72 "env",
73 "register_len",
74 |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
75 )?;
76 linker.func_wrap(
77 "env",
78 "write_register",
79 |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
80 vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
81 },
82 )?;
83 linker.func_wrap(
84 "env",
85 "storage_read",
86 |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
87 vm::storage_read(caller, base_key, sub_key, register_id)
88 },
89 )?;
90 linker.func_wrap(
91 "env",
92 "storage_write",
93 |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
94 vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
95 },
96 )?;
97 linker.func_wrap(
98 "env",
99 "storage_remove",
100 |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
101 vm::storage_remove(caller, base_key, sub_key)
102 },
103 )?;
104 linker.func_wrap(
105 "env",
106 "storage_has_key",
107 |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
108 vm::storage_has_key(caller, base_key, sub_key)
109 },
110 )?;
111 linker.func_wrap(
112 "env",
113 "storage_cursor",
114 |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
115 )?;
116
117 linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
120 linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
121 vm::tic(caller)
122 })?;
123 linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
124 vm::toc(caller)
125 })?;
126 linker.func_wrap("env", "rand", vm::rand)?;
127
128 linker.func_wrap_async(
130 "env",
131 "send_http_rq",
132 |caller: Caller<'_, VmState<S>>, (rq_head, rq_body, rs_head, rs_body, err)| {
133 Box::new(vm::async_abi::send_http_rq(
134 caller, rq_head, rq_body, rs_head, rs_body, err,
135 ))
136 },
137 )?;
138 linker.func_wrap_async(
139 "env",
140 "send_ws_msg",
141 |caller: Caller<'_, VmState<S>>, (msg_ptr, msg_len)| {
142 Box::new(vm::async_abi::send_ws_msg(caller, msg_ptr, msg_len))
143 },
144 )?;
145
146 linker.func_wrap("env", "timestamp", vm::timestamp)?;
147
148 let store = Store::new(&engine, state);
149
150 info!("Initialized runtime in: {:?}", start.elapsed());
151
152 Ok(Self {
153 linker,
154 store,
155 engine,
156 agent_store,
157 mutability_lock: lock,
158 })
159 }
160
161 pub fn into_shared(self) -> Arc<Mutex<Self>> {
162 Arc::new(Mutex::new(self))
163 }
164
165 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%agent_id), err))]
166 pub fn instantiate_sw_agent(&mut self, agent_id: AgentId, module_bytes: &[u8]) -> Result<()> {
167 let module = Module::new(&self.engine, module_bytes)?;
168 check_module(&self.engine, &module)?;
169 self.agent_store.insert_swagent(agent_id, module)?;
170 Ok(())
171 }
172
173 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
177 pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
178 let bytes = executor_id.into_bytes().to_vec();
179 self.store.data_mut().set_register(REGISTER_EXECUTOR, bytes);
180 Ok(())
181 }
182
183 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
185 pub fn register_ws(&mut self, aid: AgentId) -> Result<mpsc::Receiver<Vec<u8>>> {
186 let (tx, rx) = mpsc::channel(4);
187 self.store.data_mut().register_ws(aid, tx)?;
188 Ok(rx)
189 }
190
191 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
192 pub async fn initialize(&mut self, aid: &AgentId) -> Result<Init> {
193 let instance = self
194 .agent_store
195 .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
196 .await?
197 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
198
199 let func = instance.get_typed_func::<(), ()>(&mut self.store, "on_init")?;
201 self.store.data_mut().begin_agent_exec(*aid, false)?;
202
203 if let Err(e) = func.call_async(&mut self.store, ()).await {
204 warn!("initialize failed with error: {e}");
205 }
206 self.store.data_mut().finish_agent_exec(None)?;
207
208 let bytes = self
210 .store
211 .data()
212 .get_register(REGISTER_OUTPUT)
213 .ok_or_else(|| ErrorKind::MissingRegisterValue("init-output"))?;
214 let init = Init::from_bytes(&bytes)?;
215
216 Ok(init)
217 }
218
219 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
220 pub async fn process_ws_msg(&mut self, aid: &AgentId, msg: Vec<u8>) -> Result<Option<Events>> {
221 self.call_mut(aid, msg, "on_ws_msg", AgentCommit::Other)
222 .await
223 }
224
225 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
226 pub async fn on_ws_open(&mut self, aid: &AgentId) -> Result<Option<Events>> {
227 self.call_mut(aid, Vec::new(), "on_ws_open", AgentCommit::Other)
228 .await
229 }
230
231 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
232 pub async fn on_ws_error(&mut self, aid: &AgentId) -> Result<Option<Events>> {
233 self.call_mut(aid, Vec::new(), "on_ws_error", AgentCommit::Other)
234 .await
235 }
236
237 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
238 pub async fn on_ws_close(&mut self, aid: &AgentId) -> Result<Option<Events>> {
239 self.call_mut(aid, Vec::new(), "on_ws_close", AgentCommit::Other)
240 .await
241 }
242
243 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %introduction.id), err))]
248 pub async fn process_introduction(&mut self, introduction: Introduction) -> Result<()> {
249 let aid = match introduction.id {
250 borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
251 borderless::prelude::Id::Agent { agent_id } => agent_id,
252 };
253 let initial_state = introduction.initial_state.to_string().into_bytes();
256 let res = self
257 .call_mut(
258 &aid,
259 initial_state,
260 "process_introduction",
261 AgentCommit::Introduction { introduction },
262 )
263 .await?;
264 assert!(res.is_none(), "introductions should not write events");
265 Ok(())
266 }
267
268 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %revocation.id), err))]
270 pub async fn process_revocation(&mut self, revocation: Revocation) -> Result<()> {
271 let aid = match revocation.id {
272 borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
273 borderless::prelude::Id::Agent { agent_id } => agent_id,
274 };
275 let input = revocation.to_bytes()?;
278 let res = self
279 .call_mut(
280 &aid,
281 input,
282 "process_revocation",
283 AgentCommit::Revocation { revocation },
284 )
285 .await?;
286 assert!(res.is_none(), "revocations should not write events");
287 Ok(())
288 }
289
290 #[must_use = "You have to handle the output events of this function"]
294 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
295 pub async fn process_action(
296 &mut self,
297 aid: &AgentId,
298 action: CallAction,
299 ) -> Result<Option<Events>> {
300 let input = action.to_bytes()?;
302 self.call_mut(aid, input, "process_action", AgentCommit::Other)
303 .await
304 }
305
306 async fn call_mut(
308 &mut self,
309 aid: &AgentId,
310 input: Vec<u8>,
311 method: &'static str,
312 commit: AgentCommit,
313 ) -> Result<Option<Events>> {
314 let instance = self
315 .agent_store
316 .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
317 .await?
318 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
319
320 let lock = self.mutability_lock.get_lock(aid);
321 let _guard = lock.lock().await;
322
323 self.store.data_mut().set_register(REGISTER_INPUT, input);
325
326 let func = instance.get_typed_func::<(), ()>(&mut self.store, method)?;
328 self.store.data_mut().begin_agent_exec(*aid, true)?;
329
330 let _logs = match func.call_async(&mut self.store, ()).await {
331 Ok(()) => self.store.data_mut().finish_agent_exec(Some(commit))?,
332 Err(e) => {
333 warn!("{method} failed with error: {e}");
334 self.store.data_mut().finish_agent_exec(None)?
335 }
336 };
337 match self.store.data().get_register(REGISTER_OUTPUT) {
342 Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
343 None => Ok(None),
344 }
345 }
346
347 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path), err))]
350 pub async fn http_get_state(&mut self, aid: &AgentId, path: String) -> Result<(u16, Vec<u8>)> {
351 let instance = self
353 .agent_store
354 .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
355 .await?
356 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
357
358 self.store
360 .data_mut()
361 .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
362
363 let func = instance.get_typed_func::<(), ()>(&mut self.store, "http_get_state")?;
365
366 self.store.data_mut().begin_agent_exec(*aid, false)?;
368 if let Err(e) = func.call_async(&mut self.store, ()).await {
369 warn!("http_get_state failed with error: {e}");
370 }
371 let log = self.store.data_mut().finish_agent_exec(None)?;
373
374 let status = self
375 .store
376 .data()
377 .get_register(REGISTER_OUTPUT_HTTP_STATUS)
378 .ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
379 let status = u16::from_be_bytes(status.try_into().unwrap());
380
381 let result = self
382 .store
383 .data()
384 .get_register(REGISTER_OUTPUT_HTTP_RESULT)
385 .ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
386
387 for l in log {
389 logger::print_log_line(l);
390 }
391 Ok((status, result))
392 }
393
394 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path, %writer), err))]
403 pub async fn http_post_action(
404 &mut self,
405 aid: &AgentId,
406 path: String,
407 payload: Vec<u8>,
408 writer: &BorderlessId,
409 ) -> Result<std::result::Result<(Events, CallAction), (u16, String)>> {
410 let instance = self
411 .agent_store
412 .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
413 .await?
414 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
415
416 let lock = self.mutability_lock.get_lock(aid);
417 let _guard = lock.lock().await;
418
419 self.store
423 .data_mut()
424 .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
425
426 self.store
427 .data_mut()
428 .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
429
430 self.store
431 .data_mut()
432 .set_register(REGISTER_WRITER, writer.into_bytes().into());
433
434 let func = instance.get_typed_func::<(), ()>(&mut self.store, "http_post_action")?;
436
437 self.store.data_mut().begin_agent_exec(*aid, true)?;
439 if let Err(e) = func.call_async(&mut self.store, ()).await {
440 warn!("http_get_state failed with error: {e}");
441 }
442 let log = self
444 .store
445 .data_mut()
446 .finish_agent_exec(Some(AgentCommit::Other))?;
447
448 let status = self
449 .store
450 .data()
451 .get_register(REGISTER_OUTPUT_HTTP_STATUS)
452 .ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
453 let status = u16::from_be_bytes(status.try_into().unwrap());
454
455 let result = self
456 .store
457 .data()
458 .get_register(REGISTER_OUTPUT_HTTP_RESULT)
459 .ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
460
461 for l in log {
463 logger::print_log_line(l);
464 }
465
466 if status == 200 {
467 let events = match self.store.data().get_register(REGISTER_OUTPUT) {
468 Some(b) => Events::from_bytes(&b)?,
469 None => Events::default(),
470 };
471 let action = CallAction::from_bytes(&result)?;
472 Ok(Ok((events, action)))
473 } else {
474 let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
475 register: "http-result",
476 expected_type: "string",
477 })?;
478 Ok(Err((status, error)))
479 }
480 }
481
482 pub async fn get_symbols(&mut self, aid: &AgentId) -> Result<Option<Symbols>> {
484 let instance = self
485 .agent_store
486 .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
487 .await?
488 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
489
490 let func = instance.get_typed_func::<(), ()>(&mut self.store, "get_symbols")?;
492
493 self.store.data_mut().begin_agent_exec(*aid, false)?;
495 if let Err(e) = func.call_async(&mut self.store, ()).await {
496 warn!("http_get_state failed with error: {e}");
497 }
498 self.store.data_mut().finish_agent_exec(None)?;
500
501 let bytes = match self.store.data().get_register(REGISTER_OUTPUT) {
502 Some(b) => b,
503 None => return Ok(None),
504 };
505 let symbols = Symbols::from_bytes(&bytes)?;
506 Ok(Some(symbols))
507 }
508
509 pub fn available_agents(&self) -> Result<Vec<AgentId>> {
510 self.agent_store.available_swagents()
511 }
512}
513
514type Lock = Arc<Mutex<()>>;
515
516#[derive(Clone, Default)]
532pub struct MutLock {
533 map: Arc<SyncMutex<HashMap<AgentId, Lock>>>,
534}
535
536impl MutLock {
537 pub fn get_lock(&self, aid: &AgentId) -> Lock {
541 let mut map = self.map.lock();
542 let lock = map.entry(*aid).or_default();
543 lock.clone()
544 }
545}
546
547fn check_module(engine: &Engine, module: &Module) -> Result<()> {
551 let functions = [
552 "on_init",
553 "on_shutdown",
554 "process_action",
555 "process_introduction",
556 "process_revocation",
557 "http_get_state",
558 "http_post_action",
559 "get_symbols",
560 ];
561 for func in functions {
562 let exp = module
563 .get_export(func)
564 .ok_or_else(|| ErrorKind::MissingExport { func })?;
565 if let ExternType::Func(func_type) = exp {
566 if !func_type.matches(&FuncType::new(engine, [], [])) {
567 return Err(ErrorKind::InvalidFuncType { func }.into());
568 }
569 } else {
570 return Err(ErrorKind::InvalidExport { func }.into());
571 }
572 }
573 Ok(())
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579
580 const ALL_EXPORTS: &str = r#"
581(module
582 ;; Declare the function `placeholder`
583 (func $placeholder)
584
585 ;; Export the functions so they can be called from outside the module
586 (export "on_init" (func $placeholder))
587 (export "on_shutdown" (func $placeholder))
588 (export "process_action" (func $placeholder))
589 (export "process_introduction" (func $placeholder))
590 (export "process_revocation" (func $placeholder))
591 (export "http_get_state" (func $placeholder))
592 (export "http_post_action" (func $placeholder))
593 (export "get_symbols" (func $placeholder))
594)
595"#;
596 fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
597 let mut new_lines = Vec::new();
599
600 for line in original.lines() {
601 if !line.contains(pattern) {
603 new_lines.push(line);
605 }
606 }
607
608 new_lines.join("\n")
610 }
611
612 #[test]
613 fn missing_exports() {
614 let mut config = Config::new();
615 config.cranelift_opt_level(wasmtime::OptLevel::Speed);
616 config.async_support(false);
617 let engine = Engine::new(&config).unwrap();
618
619 let functions = [
621 "on_init",
622 "on_shutdown",
623 "process_action",
624 "process_introduction",
625 "process_revocation",
626 "http_get_state",
627 "http_post_action",
628 "get_symbols",
629 ];
630 for func in functions {
631 let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
632 let module = Module::new(&engine, &wat_missing);
633 assert!(module.is_ok());
634 let err = check_module(&engine, &module.unwrap());
635 assert!(err.is_err());
636 }
637 let module = Module::new(&engine, &ALL_EXPORTS);
638 assert!(module.is_ok());
639
640 let err = check_module(&engine, &module.unwrap());
641 assert!(err.is_ok());
642 }
643}