borderless_runtime/rt/
agent.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::agents::Init;
7use borderless::common::{Introduction, Revocation, Symbols};
8use borderless::events::Events;
9use borderless::{events::CallAction, AgentId, BorderlessId};
10use borderless_kv_store::backend::lmdb::Lmdb;
11use borderless_kv_store::Db;
12use parking_lot::Mutex as SyncMutex;
13use tokio::sync::{mpsc, Mutex};
14use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module, Store};
15
16use super::vm::AgentCommit;
17use super::{
18    code_store::CodeStore,
19    vm::{self, VmState},
20};
21use crate::log_shim::*;
22use crate::{
23    db::logger,
24    error::{ErrorKind, Result},
25    AGENT_SUB_DB,
26};
27
28pub mod tasks;
29
30pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
31
32pub struct Runtime<S = Lmdb>
33where
34    S: Db,
35{
36    linker: Linker<VmState<S>>,
37    store: Store<VmState<S>>,
38    engine: Engine,
39    agent_store: CodeStore<S>,
40    mutability_lock: MutLock,
41}
42
43impl<S: Db> Runtime<S> {
44    pub fn new(storage: &S, agent_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
45        let db_ptr = storage.create_sub_db(AGENT_SUB_DB)?;
46        let start = Instant::now();
47        let state = VmState::new_async(storage.clone(), db_ptr);
48
49        let mut config = Config::new();
50        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
51        config.async_support(true); // <- BIG difference
52        let engine = Engine::new(&config)?;
53        // let module = Module::from_file(&engine, contract_path)?;
54
55        let mut linker: Linker<VmState<S>> = Linker::new(&engine);
56
57        // NOTE: We have to wrap the functions into a closure here, because they must be monomorphized
58        // (as a generic function cannot be made into a function pointer)
59        linker.func_wrap(
60            "env",
61            "print",
62            |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
63        )?;
64        linker.func_wrap(
65            "env",
66            "read_register",
67            |caller: Caller<'_, VmState<S>>, register_id, ptr| {
68                vm::read_register(caller, register_id, ptr)
69            },
70        )?;
71        linker.func_wrap(
72            "env",
73            "register_len",
74            |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
75        )?;
76        linker.func_wrap(
77            "env",
78            "write_register",
79            |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
80                vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
81            },
82        )?;
83        linker.func_wrap(
84            "env",
85            "storage_read",
86            |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
87                vm::storage_read(caller, base_key, sub_key, register_id)
88            },
89        )?;
90        linker.func_wrap(
91            "env",
92            "storage_write",
93            |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
94                vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
95            },
96        )?;
97        linker.func_wrap(
98            "env",
99            "storage_remove",
100            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
101                vm::storage_remove(caller, base_key, sub_key)
102            },
103        )?;
104        linker.func_wrap(
105            "env",
106            "storage_has_key",
107            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
108                vm::storage_has_key(caller, base_key, sub_key)
109            },
110        )?;
111        linker.func_wrap(
112            "env",
113            "storage_cursor",
114            |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
115        )?;
116
117        // NOTE: Those functions introduce side-effects;
118        // they should only be used by us or during development of a contract
119        linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
120        linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
121            vm::tic(caller)
122        })?;
123        linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
124            vm::toc(caller)
125        })?;
126        linker.func_wrap("env", "rand", vm::rand)?;
127
128        // --- TODO: Playground for the new async api
129        linker.func_wrap_async(
130            "env",
131            "send_http_rq",
132            |caller: Caller<'_, VmState<S>>, (rq_head, rq_body, rs_head, rs_body, err)| {
133                Box::new(vm::async_abi::send_http_rq(
134                    caller, rq_head, rq_body, rs_head, rs_body, err,
135                ))
136            },
137        )?;
138        linker.func_wrap_async(
139            "env",
140            "send_ws_msg",
141            |caller: Caller<'_, VmState<S>>, (msg_ptr, msg_len)| {
142                Box::new(vm::async_abi::send_ws_msg(caller, msg_ptr, msg_len))
143            },
144        )?;
145
146        linker.func_wrap("env", "timestamp", vm::timestamp)?;
147
148        let store = Store::new(&engine, state);
149
150        info!("Initialized runtime in: {:?}", start.elapsed());
151
152        Ok(Self {
153            linker,
154            store,
155            engine,
156            agent_store,
157            mutability_lock: lock,
158        })
159    }
160
161    pub fn into_shared(self) -> Arc<Mutex<Self>> {
162        Arc::new(Mutex::new(self))
163    }
164
165    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%agent_id), err))]
166    pub fn instantiate_sw_agent(&mut self, agent_id: AgentId, module_bytes: &[u8]) -> Result<()> {
167        let module = Module::new(&self.engine, module_bytes)?;
168        check_module(&self.engine, &module)?;
169        self.agent_store.insert_swagent(agent_id, module)?;
170        Ok(())
171    }
172
173    /// Sets the currently active executor
174    ///
175    /// This writes the [`BorderlessId`] of the executor to the dedicated register, so that the wasm side can query it.
176    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
177    pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
178        let bytes = executor_id.into_bytes().to_vec();
179        self.store.data_mut().set_register(REGISTER_EXECUTOR, bytes);
180        Ok(())
181    }
182
183    /// Registers a new websocket client
184    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
185    pub fn register_ws(&mut self, aid: AgentId) -> Result<mpsc::Receiver<Vec<u8>>> {
186        let (tx, rx) = mpsc::channel(4);
187        self.store.data_mut().register_ws(aid, tx)?;
188        Ok(rx)
189    }
190
191    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
192    pub async fn initialize(&mut self, aid: &AgentId) -> Result<Init> {
193        let instance = self
194            .agent_store
195            .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
196            .await?
197            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
198
199        // Call the actual function on the wasm side
200        let func = instance.get_typed_func::<(), ()>(&mut self.store, "on_init")?;
201        self.store.data_mut().begin_agent_exec(*aid, false)?;
202
203        if let Err(e) = func.call_async(&mut self.store, ()).await {
204            warn!("initialize failed with error: {e}");
205        }
206        self.store.data_mut().finish_agent_exec(None)?;
207
208        // Return output events
209        let bytes = self
210            .store
211            .data()
212            .get_register(REGISTER_OUTPUT)
213            .ok_or_else(|| ErrorKind::MissingRegisterValue("init-output"))?;
214        let init = Init::from_bytes(&bytes)?;
215
216        Ok(init)
217    }
218
219    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
220    pub async fn process_ws_msg(&mut self, aid: &AgentId, msg: Vec<u8>) -> Result<Option<Events>> {
221        self.call_mut(aid, msg, "on_ws_msg", AgentCommit::Other)
222            .await
223    }
224
225    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
226    pub async fn on_ws_open(&mut self, aid: &AgentId) -> Result<Option<Events>> {
227        self.call_mut(aid, Vec::new(), "on_ws_open", AgentCommit::Other)
228            .await
229    }
230
231    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
232    pub async fn on_ws_error(&mut self, aid: &AgentId) -> Result<Option<Events>> {
233        self.call_mut(aid, Vec::new(), "on_ws_error", AgentCommit::Other)
234            .await
235    }
236
237    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
238    pub async fn on_ws_close(&mut self, aid: &AgentId) -> Result<Option<Events>> {
239        self.call_mut(aid, Vec::new(), "on_ws_close", AgentCommit::Other)
240            .await
241    }
242
243    // TODO: If the initial state from the introduction cannot be parsed, the agent should *not* be saved !!
244    // Currently, this creates an agent, where decoding the state will constantly explode during runtime !!!
245    //
246    // TODO: Calling process introduction on an already introduced agent should generate an error
247    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %introduction.id), err))]
248    pub async fn process_introduction(&mut self, introduction: Introduction) -> Result<()> {
249        let aid = match introduction.id {
250            borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
251            borderless::prelude::Id::Agent { agent_id } => agent_id,
252        };
253        // NOTE: The input for the introduction is not the introduction, but only the initial state!
254        // The introduction itself is commited by the VmState
255        let initial_state = introduction.initial_state.to_string().into_bytes();
256        let res = self
257            .call_mut(
258                &aid,
259                initial_state,
260                "process_introduction",
261                AgentCommit::Introduction { introduction },
262            )
263            .await?;
264        assert!(res.is_none(), "introductions should not write events");
265        Ok(())
266    }
267
268    // TODO: Calling process revocation on an already revoked agent should generate an error
269    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %revocation.id), err))]
270    pub async fn process_revocation(&mut self, revocation: Revocation) -> Result<()> {
271        let aid = match revocation.id {
272            borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
273            borderless::prelude::Id::Agent { agent_id } => agent_id,
274        };
275        // NOTE: The input for the introduction is not the introduction, but only the initial state!
276        // The introduction itself is commited by the VmState
277        let input = revocation.to_bytes()?;
278        let res = self
279            .call_mut(
280                &aid,
281                input,
282                "process_revocation",
283                AgentCommit::Revocation { revocation },
284            )
285            .await?;
286        assert!(res.is_none(), "revocations should not write events");
287        Ok(())
288    }
289
290    // OK; Just to get some stuff going; I want to just simply call an action, and execute an http-request with it.
291    // That's more than enough to test stuff out.
292    // TODO: Logging ?
293    #[must_use = "You have to handle the output events of this function"]
294    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
295    pub async fn process_action(
296        &mut self,
297        aid: &AgentId,
298        action: CallAction,
299    ) -> Result<Option<Events>> {
300        // Parse action
301        let input = action.to_bytes()?;
302        self.call_mut(aid, input, "process_action", AgentCommit::Other)
303            .await
304    }
305
306    /// Helper function for mutable calls
307    async fn call_mut(
308        &mut self,
309        aid: &AgentId,
310        input: Vec<u8>,
311        method: &'static str,
312        commit: AgentCommit,
313    ) -> Result<Option<Events>> {
314        let instance = self
315            .agent_store
316            .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
317            .await?
318            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
319
320        let lock = self.mutability_lock.get_lock(aid);
321        let _guard = lock.lock().await;
322
323        // Prepare registers
324        self.store.data_mut().set_register(REGISTER_INPUT, input);
325
326        // Call the actual function on the wasm side
327        let func = instance.get_typed_func::<(), ()>(&mut self.store, method)?;
328        self.store.data_mut().begin_agent_exec(*aid, true)?;
329
330        let _logs = match func.call_async(&mut self.store, ()).await {
331            Ok(()) => self.store.data_mut().finish_agent_exec(Some(commit))?,
332            Err(e) => {
333                warn!("{method} failed with error: {e}");
334                self.store.data_mut().finish_agent_exec(None)?
335            }
336        };
337        // Just print the logs here
338        // logs.into_iter().for_each(print_log_line);
339
340        // Return output events
341        match self.store.data().get_register(REGISTER_OUTPUT) {
342            Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
343            None => Ok(None),
344        }
345    }
346
347    // --- NOTE: Maybe we should create a separate runtime for the HTTP handling ?
348
349    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path), err))]
350    pub async fn http_get_state(&mut self, aid: &AgentId, path: String) -> Result<(u16, Vec<u8>)> {
351        // Get instance
352        let instance = self
353            .agent_store
354            .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
355            .await?
356            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
357
358        // Prepare registers
359        self.store
360            .data_mut()
361            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
362
363        // Get function
364        let func = instance.get_typed_func::<(), ()>(&mut self.store, "http_get_state")?;
365
366        // Call the function
367        self.store.data_mut().begin_agent_exec(*aid, false)?;
368        if let Err(e) = func.call_async(&mut self.store, ()).await {
369            warn!("http_get_state failed with error: {e}");
370        }
371        // Finish the execution
372        let log = self.store.data_mut().finish_agent_exec(None)?;
373
374        let status = self
375            .store
376            .data()
377            .get_register(REGISTER_OUTPUT_HTTP_STATUS)
378            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
379        let status = u16::from_be_bytes(status.try_into().unwrap());
380
381        let result = self
382            .store
383            .data()
384            .get_register(REGISTER_OUTPUT_HTTP_RESULT)
385            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
386
387        // Print the log
388        for l in log {
389            logger::print_log_line(l);
390        }
391        Ok((status, result))
392    }
393
394    // TODO: This will directly execute the action and return a list of events
395    //
396    // The question is, what should be returned via the web-api ?
397    /// Uses a POST request to parse and generate a [`CallAction`] object.
398    ///
399    /// The return type is a nested result. The outer result type should convert to a server error,
400    /// as it represents errors in the runtime itself.
401    /// The inner error type comes from the wasm code and contains the error status and message.
402    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path, %writer), err))]
403    pub async fn http_post_action(
404        &mut self,
405        aid: &AgentId,
406        path: String,
407        payload: Vec<u8>,
408        writer: &BorderlessId,
409    ) -> Result<std::result::Result<(Events, CallAction), (u16, String)>> {
410        let instance = self
411            .agent_store
412            .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
413            .await?
414            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
415
416        let lock = self.mutability_lock.get_lock(aid);
417        let _guard = lock.lock().await;
418
419        // NOTE: We cannot convert the payload into a call-action on-spot, as we might call a nested route.
420        // To be precise - we *could* do it here, but I think it is cleaner to leave this logic up to the wasm module,
421        // as otherwise we may have to duplicate the logic here (and if it changes in the macro, we have to sync this with the code of the runtime etc.).
422        self.store
423            .data_mut()
424            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
425
426        self.store
427            .data_mut()
428            .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
429
430        self.store
431            .data_mut()
432            .set_register(REGISTER_WRITER, writer.into_bytes().into());
433
434        // Get function
435        let func = instance.get_typed_func::<(), ()>(&mut self.store, "http_post_action")?;
436
437        // Call the function
438        self.store.data_mut().begin_agent_exec(*aid, true)?;
439        if let Err(e) = func.call_async(&mut self.store, ()).await {
440            warn!("http_get_state failed with error: {e}");
441        }
442        // Finish the execution
443        let log = self
444            .store
445            .data_mut()
446            .finish_agent_exec(Some(AgentCommit::Other))?;
447
448        let status = self
449            .store
450            .data()
451            .get_register(REGISTER_OUTPUT_HTTP_STATUS)
452            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
453        let status = u16::from_be_bytes(status.try_into().unwrap());
454
455        let result = self
456            .store
457            .data()
458            .get_register(REGISTER_OUTPUT_HTTP_RESULT)
459            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
460
461        // Print the log
462        for l in log {
463            logger::print_log_line(l);
464        }
465
466        if status == 200 {
467            let events = match self.store.data().get_register(REGISTER_OUTPUT) {
468                Some(b) => Events::from_bytes(&b)?,
469                None => Events::default(),
470            };
471            let action = CallAction::from_bytes(&result)?;
472            Ok(Ok((events, action)))
473        } else {
474            let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
475                register: "http-result",
476                expected_type: "string",
477            })?;
478            Ok(Err((status, error)))
479        }
480    }
481
482    /// Returns the symbols of the contract
483    pub async fn get_symbols(&mut self, aid: &AgentId) -> Result<Option<Symbols>> {
484        let instance = self
485            .agent_store
486            .get_agent(aid, &self.engine, &mut self.store, &mut self.linker)
487            .await?
488            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
489
490        // Get function
491        let func = instance.get_typed_func::<(), ()>(&mut self.store, "get_symbols")?;
492
493        // Call the function
494        self.store.data_mut().begin_agent_exec(*aid, false)?;
495        if let Err(e) = func.call_async(&mut self.store, ()).await {
496            warn!("http_get_state failed with error: {e}");
497        }
498        // Finish the execution
499        self.store.data_mut().finish_agent_exec(None)?;
500
501        let bytes = match self.store.data().get_register(REGISTER_OUTPUT) {
502            Some(b) => b,
503            None => return Ok(None),
504        };
505        let symbols = Symbols::from_bytes(&bytes)?;
506        Ok(Some(symbols))
507    }
508
509    pub fn available_agents(&self) -> Result<Vec<AgentId>> {
510        self.agent_store.available_swagents()
511    }
512}
513
514type Lock = Arc<Mutex<()>>;
515
516/// Global mutability lock for all SW-Agents
517///
518/// Since we can only allow one mutable agent execution at a given time, we need a mechanism to ensure that.
519/// The `MutLock` ensures this on a per-agent basis. It holds `RwLock`s for all agents and provides threadsafe access.
520///
521/// The logic is similar but not identical to rusts ownership rules. While there can be only one read-write (mutable) execution,
522/// there can be multiple read-only (immutable) executions even if there is an ongoing read-write execution !
523/// The reason behind this is basically that read-only executions do not produce storage operations that would change the state in the database.
524/// In the `VmState`, all write operations are buffered until the execution is finished. If there would be two executions in parallel,
525/// we might end up commiting changes to a state, that has already changed under the hood - which is not what we want.
526/// However, if there is a writer thread, the readers do not care, and also the writer does not care about the readers.
527/// The readers will use the old state, until the new one is commited by the runtime.
528///
529/// Note: In contrast to [`borderless_runtime::rt::contract::MutLock`],
530/// this version uses asynchronous locks for the agents, and a synchronous lock only for the access of the hashmap.
531#[derive(Clone, Default)]
532pub struct MutLock {
533    map: Arc<SyncMutex<HashMap<AgentId, Lock>>>,
534}
535
536impl MutLock {
537    /// Returns the `RwLock` for the given agent.
538    ///
539    /// If the agent-id is unknown, a new lock is created.
540    pub fn get_lock(&self, aid: &AgentId) -> Lock {
541        let mut map = self.map.lock();
542        let lock = map.entry(*aid).or_default();
543        lock.clone()
544    }
545}
546
547// NOTE: We could also check, if the websocket functions are exported,
548// and do a consistency check, if the module uses a websocket.
549// But maybe that's overkill.
550fn check_module(engine: &Engine, module: &Module) -> Result<()> {
551    let functions = [
552        "on_init",
553        "on_shutdown",
554        "process_action",
555        "process_introduction",
556        "process_revocation",
557        "http_get_state",
558        "http_post_action",
559        "get_symbols",
560    ];
561    for func in functions {
562        let exp = module
563            .get_export(func)
564            .ok_or_else(|| ErrorKind::MissingExport { func })?;
565        if let ExternType::Func(func_type) = exp {
566            if !func_type.matches(&FuncType::new(engine, [], [])) {
567                return Err(ErrorKind::InvalidFuncType { func }.into());
568            }
569        } else {
570            return Err(ErrorKind::InvalidExport { func }.into());
571        }
572    }
573    Ok(())
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579
580    const ALL_EXPORTS: &str = r#"
581(module
582  ;; Declare the function `placeholder`
583  (func $placeholder)
584
585  ;; Export the functions so they can be called from outside the module
586  (export "on_init" (func $placeholder))
587  (export "on_shutdown" (func $placeholder))
588  (export "process_action" (func $placeholder))
589  (export "process_introduction" (func $placeholder))
590  (export "process_revocation" (func $placeholder))
591  (export "http_get_state" (func $placeholder))
592  (export "http_post_action" (func $placeholder))
593  (export "get_symbols" (func $placeholder))
594)
595"#;
596    fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
597        // Create a new Vec to hold the processed lines
598        let mut new_lines = Vec::new();
599
600        for line in original.lines() {
601            // Check if the line contains the pattern
602            if !line.contains(pattern) {
603                // Otherwise, push the original line
604                new_lines.push(line);
605            }
606        }
607
608        // Collect the lines back into a single string
609        new_lines.join("\n")
610    }
611
612    #[test]
613    fn missing_exports() {
614        let mut config = Config::new();
615        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
616        config.async_support(false);
617        let engine = Engine::new(&config).unwrap();
618
619        // These are the functions, that must not be missing
620        let functions = [
621            "on_init",
622            "on_shutdown",
623            "process_action",
624            "process_introduction",
625            "process_revocation",
626            "http_get_state",
627            "http_post_action",
628            "get_symbols",
629        ];
630        for func in functions {
631            let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
632            let module = Module::new(&engine, &wat_missing);
633            assert!(module.is_ok());
634            let err = check_module(&engine, &module.unwrap());
635            assert!(err.is_err());
636        }
637        let module = Module::new(&engine, &ALL_EXPORTS);
638        assert!(module.is_ok());
639
640        let err = check_module(&engine, &module.unwrap());
641        assert!(err.is_ok());
642    }
643}