1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::agents::Init;
7use borderless::common::{Introduction, Revocation, Symbols};
8use borderless::events::Events;
9use borderless::{events::CallAction, AgentId, BorderlessId};
10use borderless_kv_store::backend::lmdb::Lmdb;
11use borderless_kv_store::Db;
12use http::StatusCode;
13use parking_lot::Mutex as SyncMutex;
14use tokio::sync::{mpsc, Mutex};
15use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module};
16
17use super::vm::{ActiveEntity, Commit};
18use super::{
19 code_store::CodeStore,
20 vm::{self, VmState},
21};
22use crate::db::controller::Controller;
23use crate::log_shim::*;
24use crate::{
25 error::{ErrorKind, Result},
26 AGENT_SUB_DB, SUBSCRIPTION_REL_SUB_DB,
27};
28
29pub mod tasks;
30
31pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
32
33pub struct Runtime<S = Lmdb>
34where
35 S: Db,
36{
37 linker: Linker<VmState<S>>,
38 engine: Engine,
39 agent_store: CodeStore<S>,
40 mutability_lock: MutLock,
41 executor: Option<Vec<u8>>,
42}
43
44impl<S: Db> Runtime<S> {
45 pub fn new(storage: &S, agent_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
46 let start = Instant::now();
47 let _ = storage.create_sub_db(AGENT_SUB_DB)?;
49 let _ = storage.create_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
50
51 let mut config = Config::new();
53 config.cranelift_opt_level(wasmtime::OptLevel::Speed);
54 config.async_support(true); let engine = Engine::new(&config)?;
56
57 let mut linker: Linker<VmState<S>> = Linker::new(&engine);
58
59 linker.func_wrap(
62 "env",
63 "print",
64 |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
65 )?;
66 linker.func_wrap(
67 "env",
68 "read_register",
69 |caller: Caller<'_, VmState<S>>, register_id, ptr| {
70 vm::read_register(caller, register_id, ptr)
71 },
72 )?;
73 linker.func_wrap(
74 "env",
75 "register_len",
76 |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
77 )?;
78 linker.func_wrap(
79 "env",
80 "write_register",
81 |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
82 vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
83 },
84 )?;
85 linker.func_wrap(
86 "env",
87 "storage_read",
88 |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
89 vm::storage_read(caller, base_key, sub_key, register_id)
90 },
91 )?;
92 linker.func_wrap(
93 "env",
94 "storage_write",
95 |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
96 vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
97 },
98 )?;
99 linker.func_wrap(
100 "env",
101 "storage_remove",
102 |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
103 vm::storage_remove(caller, base_key, sub_key)
104 },
105 )?;
106 linker.func_wrap(
107 "env",
108 "storage_has_key",
109 |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
110 vm::storage_has_key(caller, base_key, sub_key)
111 },
112 )?;
113 linker.func_wrap(
114 "env",
115 "storage_cursor",
116 |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
117 )?;
118
119 linker.func_wrap(
120 "env",
121 "subscribe",
122 |caller: Caller<'_, VmState<S>>, wasm_ptr, wasm_len| {
123 vm::subscribe(caller, wasm_ptr, wasm_len)
124 },
125 )?;
126
127 linker.func_wrap(
128 "env",
129 "unsubscribe",
130 |caller: Caller<'_, VmState<S>>, wasm_ptr, wasm_len| {
131 vm::unsubscribe(caller, wasm_ptr, wasm_len)
132 },
133 )?;
134
135 linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
138 linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
139 vm::tic(caller)
140 })?;
141 linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
142 vm::toc(caller)
143 })?;
144 linker.func_wrap("env", "rand", vm::rand)?;
145
146 linker.func_wrap_async(
148 "env",
149 "send_http_rq",
150 |caller: Caller<'_, VmState<S>>, (rq_head, rq_body, rs_head, rs_body, err)| {
151 Box::new(vm::async_abi::send_http_rq(
152 caller, rq_head, rq_body, rs_head, rs_body, err,
153 ))
154 },
155 )?;
156 linker.func_wrap_async(
157 "env",
158 "send_ws_msg",
159 |caller: Caller<'_, VmState<S>>, (msg_ptr, msg_len)| {
160 Box::new(vm::async_abi::send_ws_msg(caller, msg_ptr, msg_len))
161 },
162 )?;
163
164 linker.func_wrap("env", "timestamp", |caller: Caller<'_, VmState<S>>| {
165 vm::timestamp(caller)
166 })?;
167
168 info!("Initialized runtime in: {:?}", start.elapsed());
169
170 Ok(Self {
171 linker,
172 engine,
173 agent_store,
174 mutability_lock: lock,
175 executor: None,
176 })
177 }
178
179 pub fn into_shared(self) -> Arc<Mutex<Self>> {
180 Arc::new(Mutex::new(self))
181 }
182
183 pub fn get_db(&self) -> S {
185 self.agent_store.get_db()
186 }
187
188 pub fn agent_exists(&self, aid: &AgentId) -> Result<bool> {
190 let db = self.get_db();
191 let controller = Controller::new(&db);
192 controller.agent_exists(aid)
193 }
194
195 pub fn agent_revoked(&self, aid: &AgentId) -> Result<bool> {
197 let db = self.get_db();
198 let controller = Controller::new(&db);
199 controller.agent_revoked(aid)
200 }
201
202 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%agent_id), err))]
203 pub fn instantiate_sw_agent(&mut self, agent_id: AgentId, module_bytes: &[u8]) -> Result<()> {
204 let module = Module::new(&self.engine, module_bytes)?;
205 check_module(&self.engine, &module)?;
206 self.agent_store.insert_swagent(agent_id, module)?;
207 Ok(())
208 }
209
210 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, err))]
212 pub async fn check_module_and_state(
213 &mut self,
214 module_bytes: Vec<u8>,
215 state: serde_json::Value,
216 ) -> Result<(bool, Vec<String>)> {
217 let module = Module::new(&self.engine, module_bytes)?;
218 check_module(&self.engine, &module)?;
219 let mut store = self.agent_store.create_store(&self.engine)?;
220 let instance = self.linker.instantiate(&mut store, &module)?;
221
222 store
224 .data_mut()
225 .set_register(REGISTER_INPUT, state.to_string().into_bytes());
226
227 let func = instance.get_typed_func::<(), ()>(&mut store, "parse_state")?;
229
230 store.data_mut().prepare_exec(ActiveEntity::None)?;
232
233 let success = match func.call_async(&mut store, ()).await {
235 Ok(()) => true,
236 Err(_e) => false,
237 };
238 let log = store.data_mut().finish_exec(None)?;
239 Ok((success, log.into_iter().map(|l| l.msg).collect()))
240 }
241
242 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
247 pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
248 let bytes = executor_id.into_bytes().to_vec();
249 self.executor = Some(bytes);
250 Ok(())
251 }
252
253 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
255 pub fn register_ws(&mut self, aid: AgentId) -> Result<mpsc::Receiver<Vec<u8>>> {
256 let (tx, rx) = mpsc::channel(4);
257 self.mutability_lock.insert_ws_sender(&aid, tx);
258 Ok(rx)
259 }
260
261 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
262 pub async fn initialize(&mut self, aid: &AgentId) -> Result<Init> {
263 let (instance, mut store) = self
264 .agent_store
265 .get_agent(aid, &self.engine, &mut self.linker)
266 .await?
267 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
268
269 store
271 .data_mut()
272 .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
273
274 let func = instance.get_typed_func::<(), ()>(&mut store, "on_init")?;
276 store
277 .data_mut()
278 .prepare_exec(ActiveEntity::agent(*aid, false))?;
279
280 if let Err(e) = func.call_async(&mut store, ()).await {
281 warn!("initialize failed with error: {e}");
282 }
283 let output = store.data().get_register(REGISTER_OUTPUT);
284 store.data_mut().finish_exec(None)?;
285
286 let bytes = output.ok_or_else(|| ErrorKind::MissingRegisterValue("init-output"))?;
288 let init = Init::from_bytes(&bytes)?;
289
290 Ok(init)
291 }
292
293 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
294 pub async fn process_ws_msg(&mut self, aid: &AgentId, msg: Vec<u8>) -> Result<Option<Events>> {
295 self.call_mut(aid, msg, "on_ws_msg", Commit::Other).await
296 }
297
298 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
299 pub async fn on_ws_open(&mut self, aid: &AgentId) -> Result<Option<Events>> {
300 self.call_mut(aid, Vec::new(), "on_ws_open", Commit::Other)
301 .await
302 }
303
304 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
305 pub async fn on_ws_error(&mut self, aid: &AgentId) -> Result<Option<Events>> {
306 self.call_mut(aid, Vec::new(), "on_ws_error", Commit::Other)
307 .await
308 }
309
310 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
311 pub async fn on_ws_close(&mut self, aid: &AgentId) -> Result<Option<Events>> {
312 self.call_mut(aid, Vec::new(), "on_ws_close", Commit::Other)
313 .await
314 }
315
316 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %introduction.id), err))]
319 pub async fn process_introduction(&mut self, introduction: Introduction) -> Result<()> {
320 let aid = match introduction.id {
321 borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
322 borderless::prelude::Id::Agent { agent_id } => agent_id,
323 };
324 let initial_state = introduction.initial_state.to_string().into_bytes();
327 let res = self
328 .call_mut(
329 &aid,
330 initial_state,
331 "process_introduction",
332 Commit::Introduction(introduction),
333 )
334 .await?;
335 assert!(res.is_none(), "introductions should not write events");
336 Ok(())
337 }
338
339 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %revocation.id), err))]
341 pub async fn process_revocation(&mut self, revocation: Revocation) -> Result<()> {
342 let aid = match revocation.id {
343 borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
344 borderless::prelude::Id::Agent { agent_id } => agent_id,
345 };
346 let input = revocation.to_bytes()?;
347 let res = self
348 .call_mut(
349 &aid,
350 input,
351 "process_revocation",
352 Commit::Revocation(revocation),
353 )
354 .await?;
355 assert!(res.is_none(), "revocations should not write events");
356 Ok(())
357 }
358
359 #[must_use = "You have to handle the output events of this function"]
363 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
364 pub async fn process_action(
365 &mut self,
366 aid: &AgentId,
367 action: CallAction,
368 ) -> Result<Option<Events>> {
369 let input = action.to_bytes()?;
371 self.call_mut(aid, input, "process_action", Commit::Other)
372 .await
373 }
374
375 async fn call_mut(
377 &mut self,
378 aid: &AgentId,
379 input: Vec<u8>,
380 method: &'static str,
381 commit: Commit,
382 ) -> Result<Option<Events>> {
383 let (instance, mut store) = self
384 .agent_store
385 .get_agent(aid, &self.engine, &mut self.linker)
386 .await?
387 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
388
389 if self.agent_revoked(aid)? {
390 return Err(ErrorKind::RevokedAgent { aid: *aid }.into());
391 }
392
393 let state = self.mutability_lock.get_lock_state(aid);
394 let _guard = state.lock.lock().await;
395
396 store.data_mut().set_register(REGISTER_INPUT, input);
398
399 store
401 .data_mut()
402 .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
403
404 if let Some(tx) = state.ws_sender {
406 store.data_mut().register_ws(tx)?;
407 }
408
409 let func = instance.get_typed_func::<(), ()>(&mut store, method)?;
411 store
412 .data_mut()
413 .prepare_exec(ActiveEntity::agent(*aid, true))?;
414
415 let commit = match func.call_async(&mut store, ()).await {
416 Ok(()) => Some(commit),
417 Err(e) => {
418 warn!("{method} failed with error: {e}");
419 None
420 }
421 };
422 let output = store.data().get_register(REGISTER_OUTPUT);
423 let _logs = store.data_mut().finish_exec(commit)?;
424
425 match output {
427 Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
428 None => Ok(None),
429 }
430 }
431
432 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path), err))]
435 pub async fn http_get_state(&mut self, aid: &AgentId, path: String) -> Result<(u16, Vec<u8>)> {
436 let (instance, mut store) = self
438 .agent_store
439 .get_agent(aid, &self.engine, &mut self.linker)
440 .await?
441 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
442
443 store
445 .data_mut()
446 .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
447
448 store
450 .data_mut()
451 .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
452
453 let func = instance.get_typed_func::<(), ()>(&mut store, "http_get_state")?;
455
456 store
458 .data_mut()
459 .prepare_exec(ActiveEntity::agent(*aid, false))?;
460
461 if let Err(e) = func.call_async(&mut store, ()).await {
463 warn!("http_get_state failed with error: {e}");
464 }
465 let status = store.data().get_register(REGISTER_OUTPUT_HTTP_STATUS);
466 let result = store.data().get_register(REGISTER_OUTPUT_HTTP_RESULT);
467
468 let _log = store.data_mut().finish_exec(None)?;
470
471 let status = status.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
473 let status_bytes = status
474 .try_into()
475 .map_err(|_| ErrorKind::InvalidRegisterValue {
476 register: "http-status",
477 expected_type: "u16",
478 })?;
479 let status = u16::from_be_bytes(status_bytes);
480
481 let result = result.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
483
484 Ok((status, result))
485 }
486
487 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path, %writer), err))]
496 pub async fn http_post_action(
497 &mut self,
498 aid: &AgentId,
499 path: String,
500 payload: Vec<u8>,
501 writer: &BorderlessId, ) -> Result<std::result::Result<(Events, CallAction), (u16, String)>> {
503 let Some((instance, mut store)) = self
505 .agent_store
506 .get_agent(aid, &self.engine, &mut self.linker)
507 .await?
508 else {
509 return Ok(Err((
510 StatusCode::BAD_REQUEST.as_u16(),
511 ErrorKind::MissingAgent { aid: *aid }.to_string(),
512 )));
513 };
514 if self.agent_revoked(aid)? {
516 return Ok(Err((
517 StatusCode::BAD_REQUEST.as_u16(),
518 ErrorKind::RevokedAgent { aid: *aid }.to_string(),
519 )));
520 }
521
522 let state = self.mutability_lock.get_lock_state(aid);
523 let _guard = state.lock.lock().await;
524
525 store
529 .data_mut()
530 .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
531
532 store
533 .data_mut()
534 .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
535
536 store
537 .data_mut()
538 .set_register(REGISTER_WRITER, writer.into_bytes().into());
539
540 store
542 .data_mut()
543 .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
544
545 store
547 .data_mut()
548 .prepare_exec(ActiveEntity::agent(*aid, true))?;
549
550 let func = instance.get_typed_func::<(), ()>(&mut store, "http_post_action")?;
552
553 if let Err(e) = func.call_async(&mut store, ()).await {
555 warn!("http_get_state failed with error: {e}");
556 }
557 let status = store.data().get_register(REGISTER_OUTPUT_HTTP_STATUS);
558 let result = store.data().get_register(REGISTER_OUTPUT_HTTP_RESULT);
559 let output = store.data().get_register(REGISTER_OUTPUT);
560
561 let _log = store.data_mut().finish_exec(Some(Commit::Other))?;
564
565 let status = status.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
567 let status_bytes = status
568 .try_into()
569 .map_err(|_| ErrorKind::InvalidRegisterValue {
570 register: "http-status",
571 expected_type: "u16",
572 })?;
573 let status = u16::from_be_bytes(status_bytes);
574
575 let result = result.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
577
578 if status == 200 {
579 let events = match output {
580 Some(b) => Events::from_bytes(&b)?,
581 None => Events::default(),
582 };
583 let action = CallAction::from_bytes(&result)?;
584 Ok(Ok((events, action)))
585 } else {
586 let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
587 register: "http-result",
588 expected_type: "string",
589 })?;
590 Ok(Err((status, error)))
591 }
592 }
593
594 pub async fn get_symbols(&mut self, aid: &AgentId) -> Result<Option<Symbols>> {
596 let (instance, mut store) = self
597 .agent_store
598 .get_agent(aid, &self.engine, &mut self.linker)
599 .await?
600 .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
601
602 store.data_mut().prepare_exec(ActiveEntity::None)?;
603
604 if let Err(e) = instance
606 .get_typed_func::<(), ()>(&mut store, "get_symbols")
607 .and_then(|func| func.call(&mut store, ()))
608 {
609 error!("get_symbols failed with error: {e}");
610 }
611 let output = store.data().get_register(REGISTER_OUTPUT);
612 store.data_mut().finish_exec(None)?;
613
614 let bytes = match output {
615 Some(b) => b,
616 None => return Ok(None),
617 };
618 let symbols = Symbols::from_bytes(&bytes)?;
619 Ok(Some(symbols))
620 }
621
622 pub fn available_agents(&self) -> Result<Vec<AgentId>> {
623 self.agent_store.available_swagents()
624 }
625}
626
627#[derive(Default, Clone)]
629pub struct Lock {
630 lock: Arc<Mutex<()>>,
631 ws_sender: Option<mpsc::Sender<Vec<u8>>>,
632}
633
634#[derive(Clone, Default)]
653pub struct MutLock {
654 map: Arc<SyncMutex<HashMap<AgentId, Lock>>>,
655}
656
657impl MutLock {
658 pub fn get_lock_state(&self, aid: &AgentId) -> Lock {
662 let mut map = self.map.lock();
663 let lock = map.entry(*aid).or_default();
664 lock.clone()
665 }
666
667 pub fn insert_ws_sender(&self, aid: &AgentId, ws_sender: mpsc::Sender<Vec<u8>>) {
671 let mut map = self.map.lock();
672 let lock = map.entry(*aid).or_default();
673 assert!(
674 lock.ws_sender.is_none(),
675 "Cannot register websocket twice on the same agent"
676 );
677 lock.ws_sender = Some(ws_sender);
678 }
679}
680
681fn check_module(engine: &Engine, module: &Module) -> Result<()> {
685 let functions = [
686 "on_init",
687 "on_shutdown",
688 "process_action",
689 "process_introduction",
690 "process_revocation",
691 "http_get_state",
692 "http_post_action",
693 "parse_state",
694 "get_symbols",
695 ];
696 for func in functions {
697 let exp = module
698 .get_export(func)
699 .ok_or_else(|| ErrorKind::MissingExport { func })?;
700 if let ExternType::Func(func_type) = exp {
701 if !func_type.matches(&FuncType::new(engine, [], [])) {
702 return Err(ErrorKind::InvalidFuncType { func }.into());
703 }
704 } else {
705 return Err(ErrorKind::InvalidExport { func }.into());
706 }
707 }
708 Ok(())
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 const ALL_EXPORTS: &str = r#"
716(module
717 ;; Declare the function `placeholder`
718 (func $placeholder)
719
720 ;; Export the functions so they can be called from outside the module
721 (export "on_init" (func $placeholder))
722 (export "on_shutdown" (func $placeholder))
723 (export "process_action" (func $placeholder))
724 (export "process_introduction" (func $placeholder))
725 (export "process_revocation" (func $placeholder))
726 (export "http_get_state" (func $placeholder))
727 (export "http_post_action" (func $placeholder))
728 (export "parse_state" (func $placeholder))
729 (export "get_symbols" (func $placeholder))
730)
731"#;
732 fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
733 let mut new_lines = Vec::new();
735
736 for line in original.lines() {
737 if !line.contains(pattern) {
739 new_lines.push(line);
741 }
742 }
743
744 new_lines.join("\n")
746 }
747
748 #[test]
749 fn missing_exports() {
750 let mut config = Config::new();
751 config.cranelift_opt_level(wasmtime::OptLevel::Speed);
752 config.async_support(false);
753 let engine = Engine::new(&config).unwrap();
754
755 let functions = [
757 "on_init",
758 "on_shutdown",
759 "process_action",
760 "process_introduction",
761 "process_revocation",
762 "http_get_state",
763 "http_post_action",
764 "parse_state",
765 "get_symbols",
766 ];
767 for func in functions {
768 let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
769 let module = Module::new(&engine, &wat_missing);
770 assert!(module.is_ok());
771 let err = check_module(&engine, &module.unwrap());
772 assert!(err.is_err());
773 }
774 let module = Module::new(&engine, &ALL_EXPORTS);
775 assert!(module.is_ok());
776
777 let err = check_module(&engine, &module.unwrap());
778 assert!(err.is_ok());
779 }
780}