Skip to main content

borderless_runtime/rt/
agent.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::agents::Init;
7use borderless::common::{Introduction, Revocation, Symbols};
8use borderless::events::Events;
9use borderless::{events::CallAction, AgentId, BorderlessId};
10use borderless_kv_store::backend::lmdb::Lmdb;
11use borderless_kv_store::Db;
12use http::StatusCode;
13use parking_lot::Mutex as SyncMutex;
14use tokio::sync::{mpsc, Mutex};
15use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module};
16
17use super::vm::{ActiveEntity, Commit};
18use super::{
19    code_store::CodeStore,
20    vm::{self, VmState},
21};
22use crate::db::controller::Controller;
23use crate::log_shim::*;
24use crate::{
25    error::{ErrorKind, Result},
26    AGENT_SUB_DB, SUBSCRIPTION_REL_SUB_DB,
27};
28
29pub mod tasks;
30
31pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
32
33pub struct Runtime<S = Lmdb>
34where
35    S: Db,
36{
37    linker: Linker<VmState<S>>,
38    engine: Engine,
39    agent_store: CodeStore<S>,
40    mutability_lock: MutLock,
41    executor: Option<Vec<u8>>,
42}
43
44impl<S: Db> Runtime<S> {
45    pub fn new(storage: &S, agent_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
46        let start = Instant::now();
47        // Create agent sub-db (in case it does not exist)
48        let _ = storage.create_sub_db(AGENT_SUB_DB)?;
49        let _ = storage.create_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
50
51        // Generate engine ( with async enabled )
52        let mut config = Config::new();
53        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
54        config.async_support(true); // <- BIG difference
55        let engine = Engine::new(&config)?;
56
57        let mut linker: Linker<VmState<S>> = Linker::new(&engine);
58
59        // NOTE: We have to wrap the functions into a closure here, because they must be monomorphized
60        // (as a generic function cannot be made into a function pointer)
61        linker.func_wrap(
62            "env",
63            "print",
64            |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
65        )?;
66        linker.func_wrap(
67            "env",
68            "read_register",
69            |caller: Caller<'_, VmState<S>>, register_id, ptr| {
70                vm::read_register(caller, register_id, ptr)
71            },
72        )?;
73        linker.func_wrap(
74            "env",
75            "register_len",
76            |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
77        )?;
78        linker.func_wrap(
79            "env",
80            "write_register",
81            |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
82                vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
83            },
84        )?;
85        linker.func_wrap(
86            "env",
87            "storage_read",
88            |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
89                vm::storage_read(caller, base_key, sub_key, register_id)
90            },
91        )?;
92        linker.func_wrap(
93            "env",
94            "storage_write",
95            |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
96                vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
97            },
98        )?;
99        linker.func_wrap(
100            "env",
101            "storage_remove",
102            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
103                vm::storage_remove(caller, base_key, sub_key)
104            },
105        )?;
106        linker.func_wrap(
107            "env",
108            "storage_has_key",
109            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
110                vm::storage_has_key(caller, base_key, sub_key)
111            },
112        )?;
113        linker.func_wrap(
114            "env",
115            "storage_cursor",
116            |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
117        )?;
118
119        linker.func_wrap(
120            "env",
121            "subscribe",
122            |caller: Caller<'_, VmState<S>>, wasm_ptr, wasm_len| {
123                vm::subscribe(caller, wasm_ptr, wasm_len)
124            },
125        )?;
126
127        linker.func_wrap(
128            "env",
129            "unsubscribe",
130            |caller: Caller<'_, VmState<S>>, wasm_ptr, wasm_len| {
131                vm::unsubscribe(caller, wasm_ptr, wasm_len)
132            },
133        )?;
134
135        // NOTE: Those functions introduce side-effects;
136        // they should only be used by us or during development of a contract
137        linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
138        linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
139            vm::tic(caller)
140        })?;
141        linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
142            vm::toc(caller)
143        })?;
144        linker.func_wrap("env", "rand", vm::rand)?;
145
146        // --- TODO: Playground for the new async api
147        linker.func_wrap_async(
148            "env",
149            "send_http_rq",
150            |caller: Caller<'_, VmState<S>>, (rq_head, rq_body, rs_head, rs_body, err)| {
151                Box::new(vm::async_abi::send_http_rq(
152                    caller, rq_head, rq_body, rs_head, rs_body, err,
153                ))
154            },
155        )?;
156        linker.func_wrap_async(
157            "env",
158            "send_ws_msg",
159            |caller: Caller<'_, VmState<S>>, (msg_ptr, msg_len)| {
160                Box::new(vm::async_abi::send_ws_msg(caller, msg_ptr, msg_len))
161            },
162        )?;
163
164        linker.func_wrap("env", "timestamp", |caller: Caller<'_, VmState<S>>| {
165            vm::timestamp(caller)
166        })?;
167
168        info!("Initialized runtime in: {:?}", start.elapsed());
169
170        Ok(Self {
171            linker,
172            engine,
173            agent_store,
174            mutability_lock: lock,
175            executor: None,
176        })
177    }
178
179    pub fn into_shared(self) -> Arc<Mutex<Self>> {
180        Arc::new(Mutex::new(self))
181    }
182
183    /// Returns a copy of the underlying db handle
184    pub fn get_db(&self) -> S {
185        self.agent_store.get_db()
186    }
187
188    /// Check whether a sw-agent exists
189    pub fn agent_exists(&self, aid: &AgentId) -> Result<bool> {
190        let db = self.get_db();
191        let controller = Controller::new(&db);
192        controller.agent_exists(aid)
193    }
194
195    /// Check whether a sw-agent is revoked
196    pub fn agent_revoked(&self, aid: &AgentId) -> Result<bool> {
197        let db = self.get_db();
198        let controller = Controller::new(&db);
199        controller.agent_revoked(aid)
200    }
201
202    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%agent_id), err))]
203    pub fn instantiate_sw_agent(&mut self, agent_id: AgentId, module_bytes: &[u8]) -> Result<()> {
204        let module = Module::new(&self.engine, module_bytes)?;
205        check_module(&self.engine, &module)?;
206        self.agent_store.insert_swagent(agent_id, module)?;
207        Ok(())
208    }
209
210    /// Sanity check for introductions
211    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, err))]
212    pub async fn check_module_and_state(
213        &mut self,
214        module_bytes: Vec<u8>,
215        state: serde_json::Value,
216    ) -> Result<(bool, Vec<String>)> {
217        let module = Module::new(&self.engine, module_bytes)?;
218        check_module(&self.engine, &module)?;
219        let mut store = self.agent_store.create_store(&self.engine)?;
220        let instance = self.linker.instantiate(&mut store, &module)?;
221
222        // Prepare registers
223        store
224            .data_mut()
225            .set_register(REGISTER_INPUT, state.to_string().into_bytes());
226
227        // Get function
228        let func = instance.get_typed_func::<(), ()>(&mut store, "parse_state")?;
229
230        // Prepare execution
231        store.data_mut().prepare_exec(ActiveEntity::None)?;
232
233        // Call the actual function on the wasm side
234        let success = match func.call_async(&mut store, ()).await {
235            Ok(()) => true,
236            Err(_e) => false,
237        };
238        let log = store.data_mut().finish_exec(None)?;
239        Ok((success, log.into_iter().map(|l| l.msg).collect()))
240    }
241
242    /// Sets the currently active executor
243    ///
244    /// This buffers the [`BorderlessId`] of the executor, to later write it into the dedicated register,
245    /// so that the wasm side can query it.
246    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
247    pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
248        let bytes = executor_id.into_bytes().to_vec();
249        self.executor = Some(bytes);
250        Ok(())
251    }
252
253    /// Registers a new websocket client
254    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
255    pub fn register_ws(&mut self, aid: AgentId) -> Result<mpsc::Receiver<Vec<u8>>> {
256        let (tx, rx) = mpsc::channel(4);
257        self.mutability_lock.insert_ws_sender(&aid, tx);
258        Ok(rx)
259    }
260
261    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
262    pub async fn initialize(&mut self, aid: &AgentId) -> Result<Init> {
263        let (instance, mut store) = self
264            .agent_store
265            .get_agent(aid, &self.engine, &mut self.linker)
266            .await?
267            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
268
269        // Buffered registers
270        store
271            .data_mut()
272            .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
273
274        // Call the actual function on the wasm side
275        let func = instance.get_typed_func::<(), ()>(&mut store, "on_init")?;
276        store
277            .data_mut()
278            .prepare_exec(ActiveEntity::agent(*aid, false))?;
279
280        if let Err(e) = func.call_async(&mut store, ()).await {
281            warn!("initialize failed with error: {e}");
282        }
283        let output = store.data().get_register(REGISTER_OUTPUT);
284        store.data_mut().finish_exec(None)?;
285
286        // Return output events
287        let bytes = output.ok_or_else(|| ErrorKind::MissingRegisterValue("init-output"))?;
288        let init = Init::from_bytes(&bytes)?;
289
290        Ok(init)
291    }
292
293    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
294    pub async fn process_ws_msg(&mut self, aid: &AgentId, msg: Vec<u8>) -> Result<Option<Events>> {
295        self.call_mut(aid, msg, "on_ws_msg", Commit::Other).await
296    }
297
298    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
299    pub async fn on_ws_open(&mut self, aid: &AgentId) -> Result<Option<Events>> {
300        self.call_mut(aid, Vec::new(), "on_ws_open", Commit::Other)
301            .await
302    }
303
304    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
305    pub async fn on_ws_error(&mut self, aid: &AgentId) -> Result<Option<Events>> {
306        self.call_mut(aid, Vec::new(), "on_ws_error", Commit::Other)
307            .await
308    }
309
310    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
311    pub async fn on_ws_close(&mut self, aid: &AgentId) -> Result<Option<Events>> {
312        self.call_mut(aid, Vec::new(), "on_ws_close", Commit::Other)
313            .await
314    }
315
316    // TODO: If the initial state from the introduction cannot be parsed, the agent should *not* be saved !!
317    // Currently, this creates an agent, where decoding the state will constantly explode during runtime !!!
318    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %introduction.id), err))]
319    pub async fn process_introduction(&mut self, introduction: Introduction) -> Result<()> {
320        let aid = match introduction.id {
321            borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
322            borderless::prelude::Id::Agent { agent_id } => agent_id,
323        };
324        // NOTE: The input for the introduction is not the introduction, but only the initial state!
325        // The introduction itself is commited by the VmState
326        let initial_state = introduction.initial_state.to_string().into_bytes();
327        let res = self
328            .call_mut(
329                &aid,
330                initial_state,
331                "process_introduction",
332                Commit::Introduction(introduction),
333            )
334            .await?;
335        assert!(res.is_none(), "introductions should not write events");
336        Ok(())
337    }
338
339    // TODO: Calling process revocation on an already revoked agent should generate an error
340    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %revocation.id), err))]
341    pub async fn process_revocation(&mut self, revocation: Revocation) -> Result<()> {
342        let aid = match revocation.id {
343            borderless::prelude::Id::Contract { .. } => return Err(ErrorKind::InvalidIdType.into()),
344            borderless::prelude::Id::Agent { agent_id } => agent_id,
345        };
346        let input = revocation.to_bytes()?;
347        let res = self
348            .call_mut(
349                &aid,
350                input,
351                "process_revocation",
352                Commit::Revocation(revocation),
353            )
354            .await?;
355        assert!(res.is_none(), "revocations should not write events");
356        Ok(())
357    }
358
359    // OK; Just to get some stuff going; I want to just simply call an action, and execute an http-request with it.
360    // That's more than enough to test stuff out.
361    // TODO: Logging ?
362    #[must_use = "You have to handle the output events of this function"]
363    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid), err))]
364    pub async fn process_action(
365        &mut self,
366        aid: &AgentId,
367        action: CallAction,
368    ) -> Result<Option<Events>> {
369        // Parse action
370        let input = action.to_bytes()?;
371        self.call_mut(aid, input, "process_action", Commit::Other)
372            .await
373    }
374
375    /// Helper function for mutable calls
376    async fn call_mut(
377        &mut self,
378        aid: &AgentId,
379        input: Vec<u8>,
380        method: &'static str,
381        commit: Commit,
382    ) -> Result<Option<Events>> {
383        let (instance, mut store) = self
384            .agent_store
385            .get_agent(aid, &self.engine, &mut self.linker)
386            .await?
387            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
388
389        if self.agent_revoked(aid)? {
390            return Err(ErrorKind::RevokedAgent { aid: *aid }.into());
391        }
392
393        let state = self.mutability_lock.get_lock_state(aid);
394        let _guard = state.lock.lock().await;
395
396        // Prepare registers
397        store.data_mut().set_register(REGISTER_INPUT, input);
398
399        // Buffered registers
400        store
401            .data_mut()
402            .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
403
404        // Inject ws-sender (if any)
405        if let Some(tx) = state.ws_sender {
406            store.data_mut().register_ws(tx)?;
407        }
408
409        // Call the actual function on the wasm side
410        let func = instance.get_typed_func::<(), ()>(&mut store, method)?;
411        store
412            .data_mut()
413            .prepare_exec(ActiveEntity::agent(*aid, true))?;
414
415        let commit = match func.call_async(&mut store, ()).await {
416            Ok(()) => Some(commit),
417            Err(e) => {
418                warn!("{method} failed with error: {e}");
419                None
420            }
421        };
422        let output = store.data().get_register(REGISTER_OUTPUT);
423        let _logs = store.data_mut().finish_exec(commit)?;
424
425        // Return output events
426        match output {
427            Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
428            None => Ok(None),
429        }
430    }
431
432    // --- NOTE: Maybe we should create a separate runtime for the HTTP handling ?
433
434    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path), err))]
435    pub async fn http_get_state(&mut self, aid: &AgentId, path: String) -> Result<(u16, Vec<u8>)> {
436        // Get instance
437        let (instance, mut store) = self
438            .agent_store
439            .get_agent(aid, &self.engine, &mut self.linker)
440            .await?
441            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
442
443        // Prepare registers
444        store
445            .data_mut()
446            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
447
448        // Buffered registers
449        store
450            .data_mut()
451            .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
452
453        // Get function
454        let func = instance.get_typed_func::<(), ()>(&mut store, "http_get_state")?;
455
456        // Prepare execution
457        store
458            .data_mut()
459            .prepare_exec(ActiveEntity::agent(*aid, false))?;
460
461        // Call the function
462        if let Err(e) = func.call_async(&mut store, ()).await {
463            warn!("http_get_state failed with error: {e}");
464        }
465        let status = store.data().get_register(REGISTER_OUTPUT_HTTP_STATUS);
466        let result = store.data().get_register(REGISTER_OUTPUT_HTTP_RESULT);
467
468        // Finish the execution ( and commit nothing )
469        let _log = store.data_mut().finish_exec(None)?;
470
471        // Parse status
472        let status = status.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
473        let status_bytes = status
474            .try_into()
475            .map_err(|_| ErrorKind::InvalidRegisterValue {
476                register: "http-status",
477                expected_type: "u16",
478            })?;
479        let status = u16::from_be_bytes(status_bytes);
480
481        // Check result
482        let result = result.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
483
484        Ok((status, result))
485    }
486
487    // TODO: This will directly execute the action and return a list of events
488    //
489    // The question is, what should be returned via the web-api ?
490    /// Uses a POST request to parse and generate a [`CallAction`] object.
491    ///
492    /// The return type is a nested result. The outer result type should convert to a server error,
493    /// as it represents errors in the runtime itself.
494    /// The inner error type comes from the wasm code and contains the error status and message.
495    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(agent_id = %aid, %path, %writer), err))]
496    pub async fn http_post_action(
497        &mut self,
498        aid: &AgentId,
499        path: String,
500        payload: Vec<u8>,
501        writer: &BorderlessId, // TODO: I think the writer makes no sense here and is an artifact
502    ) -> Result<std::result::Result<(Events, CallAction), (u16, String)>> {
503        // Check whether agent exists
504        let Some((instance, mut store)) = self
505            .agent_store
506            .get_agent(aid, &self.engine, &mut self.linker)
507            .await?
508        else {
509            return Ok(Err((
510                StatusCode::BAD_REQUEST.as_u16(),
511                ErrorKind::MissingAgent { aid: *aid }.to_string(),
512            )));
513        };
514        // Check whether agent is revoked
515        if self.agent_revoked(aid)? {
516            return Ok(Err((
517                StatusCode::BAD_REQUEST.as_u16(),
518                ErrorKind::RevokedAgent { aid: *aid }.to_string(),
519            )));
520        }
521
522        let state = self.mutability_lock.get_lock_state(aid);
523        let _guard = state.lock.lock().await;
524
525        // NOTE: We cannot convert the payload into a call-action on-spot, as we might call a nested route.
526        // To be precise - we *could* do it here, but I think it is cleaner to leave this logic up to the wasm module,
527        // as otherwise we may have to duplicate the logic here (and if it changes in the macro, we have to sync this with the code of the runtime etc.).
528        store
529            .data_mut()
530            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
531
532        store
533            .data_mut()
534            .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
535
536        store
537            .data_mut()
538            .set_register(REGISTER_WRITER, writer.into_bytes().into());
539
540        // Buffered registers
541        store
542            .data_mut()
543            .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
544
545        // Prepare mutable execution
546        store
547            .data_mut()
548            .prepare_exec(ActiveEntity::agent(*aid, true))?;
549
550        // Get function
551        let func = instance.get_typed_func::<(), ()>(&mut store, "http_post_action")?;
552
553        // Call the function
554        if let Err(e) = func.call_async(&mut store, ()).await {
555            warn!("http_get_state failed with error: {e}");
556        }
557        let status = store.data().get_register(REGISTER_OUTPUT_HTTP_STATUS);
558        let result = store.data().get_register(REGISTER_OUTPUT_HTTP_RESULT);
559        let output = store.data().get_register(REGISTER_OUTPUT);
560
561        // Finish the execution
562        // NOTE: This will clear all the registers !
563        let _log = store.data_mut().finish_exec(Some(Commit::Other))?;
564
565        // Parse status
566        let status = status.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
567        let status_bytes = status
568            .try_into()
569            .map_err(|_| ErrorKind::InvalidRegisterValue {
570                register: "http-status",
571                expected_type: "u16",
572            })?;
573        let status = u16::from_be_bytes(status_bytes);
574
575        // Check result
576        let result = result.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
577
578        if status == 200 {
579            let events = match output {
580                Some(b) => Events::from_bytes(&b)?,
581                None => Events::default(),
582            };
583            let action = CallAction::from_bytes(&result)?;
584            Ok(Ok((events, action)))
585        } else {
586            let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
587                register: "http-result",
588                expected_type: "string",
589            })?;
590            Ok(Err((status, error)))
591        }
592    }
593
594    /// Returns the symbols of the contract
595    pub async fn get_symbols(&mut self, aid: &AgentId) -> Result<Option<Symbols>> {
596        let (instance, mut store) = self
597            .agent_store
598            .get_agent(aid, &self.engine, &mut self.linker)
599            .await?
600            .ok_or_else(|| ErrorKind::MissingAgent { aid: *aid })?;
601
602        store.data_mut().prepare_exec(ActiveEntity::None)?;
603
604        // In case the contract does not export any symbols, just return 'None'
605        if let Err(e) = instance
606            .get_typed_func::<(), ()>(&mut store, "get_symbols")
607            .and_then(|func| func.call(&mut store, ()))
608        {
609            error!("get_symbols failed with error: {e}");
610        }
611        let output = store.data().get_register(REGISTER_OUTPUT);
612        store.data_mut().finish_exec(None)?;
613
614        let bytes = match output {
615            Some(b) => b,
616            None => return Ok(None),
617        };
618        let symbols = Symbols::from_bytes(&bytes)?;
619        Ok(Some(symbols))
620    }
621
622    pub fn available_agents(&self) -> Result<Vec<AgentId>> {
623        self.agent_store.available_swagents()
624    }
625}
626
627// NOTE: We Mis-Use the lock here to also carry persistent state - e.g. for the websocket
628#[derive(Default, Clone)]
629pub struct Lock {
630    lock: Arc<Mutex<()>>,
631    ws_sender: Option<mpsc::Sender<Vec<u8>>>,
632}
633
634/// Global mutability lock for all SW-Agents
635///
636/// Since we can only allow one mutable agent execution at a given time, we need a mechanism to ensure that.
637/// The `MutLock` ensures this on a per-agent basis. It holds `RwLock`s for all agents and provides threadsafe access.
638///
639/// The logic is similar but not identical to rusts ownership rules. While there can be only one read-write (mutable) execution,
640/// there can be multiple read-only (immutable) executions even if there is an ongoing read-write execution !
641/// The reason behind this is basically that read-only executions do not produce storage operations that would change the state in the database.
642/// In the `VmState`, all write operations are buffered until the execution is finished. If there would be two executions in parallel,
643/// we might end up commiting changes to a state, that has already changed under the hood - which is not what we want.
644/// However, if there is a writer thread, the readers do not care, and also the writer does not care about the readers.
645/// The readers will use the old state, until the new one is commited by the runtime.
646///
647/// Note: In contrast to [`borderless_runtime::rt::contract::MutLock`],
648/// this version uses asynchronous locks for the agents, and a synchronous lock only for the access of the hashmap.
649///
650/// Additionally, this double-functions as the provider of over-arching state per agent,
651/// e.g. the lock also contains the websocket sender, for agents that use websockets.
652#[derive(Clone, Default)]
653pub struct MutLock {
654    map: Arc<SyncMutex<HashMap<AgentId, Lock>>>,
655}
656
657impl MutLock {
658    /// Returns the `RwLock` for the given agent.
659    ///
660    /// If the agent-id is unknown, a new lock is created.
661    pub fn get_lock_state(&self, aid: &AgentId) -> Lock {
662        let mut map = self.map.lock();
663        let lock = map.entry(*aid).or_default();
664        lock.clone()
665    }
666
667    /// Inserts a new ws-sender for the agent
668    ///
669    /// Panics if the sender already contained a lock
670    pub fn insert_ws_sender(&self, aid: &AgentId, ws_sender: mpsc::Sender<Vec<u8>>) {
671        let mut map = self.map.lock();
672        let lock = map.entry(*aid).or_default();
673        assert!(
674            lock.ws_sender.is_none(),
675            "Cannot register websocket twice on the same agent"
676        );
677        lock.ws_sender = Some(ws_sender);
678    }
679}
680
681// NOTE: We could also check, if the websocket functions are exported,
682// and do a consistency check, if the module uses a websocket.
683// But maybe that's overkill.
684fn check_module(engine: &Engine, module: &Module) -> Result<()> {
685    let functions = [
686        "on_init",
687        "on_shutdown",
688        "process_action",
689        "process_introduction",
690        "process_revocation",
691        "http_get_state",
692        "http_post_action",
693        "parse_state",
694        "get_symbols",
695    ];
696    for func in functions {
697        let exp = module
698            .get_export(func)
699            .ok_or_else(|| ErrorKind::MissingExport { func })?;
700        if let ExternType::Func(func_type) = exp {
701            if !func_type.matches(&FuncType::new(engine, [], [])) {
702                return Err(ErrorKind::InvalidFuncType { func }.into());
703            }
704        } else {
705            return Err(ErrorKind::InvalidExport { func }.into());
706        }
707    }
708    Ok(())
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    const ALL_EXPORTS: &str = r#"
716(module
717  ;; Declare the function `placeholder`
718  (func $placeholder)
719
720  ;; Export the functions so they can be called from outside the module
721  (export "on_init" (func $placeholder))
722  (export "on_shutdown" (func $placeholder))
723  (export "process_action" (func $placeholder))
724  (export "process_introduction" (func $placeholder))
725  (export "process_revocation" (func $placeholder))
726  (export "http_get_state" (func $placeholder))
727  (export "http_post_action" (func $placeholder))
728  (export "parse_state" (func $placeholder))
729  (export "get_symbols" (func $placeholder))
730)
731"#;
732    fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
733        // Create a new Vec to hold the processed lines
734        let mut new_lines = Vec::new();
735
736        for line in original.lines() {
737            // Check if the line contains the pattern
738            if !line.contains(pattern) {
739                // Otherwise, push the original line
740                new_lines.push(line);
741            }
742        }
743
744        // Collect the lines back into a single string
745        new_lines.join("\n")
746    }
747
748    #[test]
749    fn missing_exports() {
750        let mut config = Config::new();
751        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
752        config.async_support(false);
753        let engine = Engine::new(&config).unwrap();
754
755        // These are the functions, that must not be missing
756        let functions = [
757            "on_init",
758            "on_shutdown",
759            "process_action",
760            "process_introduction",
761            "process_revocation",
762            "http_get_state",
763            "http_post_action",
764            "parse_state",
765            "get_symbols",
766        ];
767        for func in functions {
768            let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
769            let module = Module::new(&engine, &wat_missing);
770            assert!(module.is_ok());
771            let err = check_module(&engine, &module.unwrap());
772            assert!(err.is_err());
773        }
774        let module = Module::new(&engine, &ALL_EXPORTS);
775        assert!(module.is_ok());
776
777        let err = check_module(&engine, &module.unwrap());
778        assert!(err.is_ok());
779    }
780}