borderless_runtime/rt/
contract.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::common::{Introduction, Revocation, Symbols};
7use borderless::contracts::{BlockCtx, TxCtx};
8use borderless::events::Events;
9use borderless::{events::CallAction, ContractId};
10use borderless::{BlockIdentifier, BorderlessId};
11use borderless_kv_store::backend::lmdb::Lmdb;
12use borderless_kv_store::Db;
13use parking_lot::Mutex;
14use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module};
15
16use super::vm::{ActiveEntity, Commit};
17use super::{
18    code_store::CodeStore,
19    vm::{self, VmState},
20};
21use crate::{
22    error::{ErrorKind, Result},
23    CONTRACT_SUB_DB,
24};
25use crate::{log_shim::*, LEDGER_SUB_DB};
26use crate::{ACTION_TX_REL_SUB_DB, SUBSCRIPTION_REL_SUB_DB};
27
28pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
29
30/*
31 * Runtime TODO's:
32 * - use one global engine for all runtimes <- per runtime type !
33 * - make the Store a short lived object
34 * - use per-runtime caching (as an Instance is bound to the Store)
35 * - invalidate the cache, when re-creating the store
36 * - check State::decode before introducing
37 *
38 */
39
40pub struct Runtime<S = Lmdb>
41where
42    S: Db,
43{
44    linker: Linker<VmState<S>>,
45    engine: Engine,
46    contract_store: CodeStore<S>,
47    mutability_lock: MutLock,
48    block_ctx: Option<Vec<u8>>,
49    executor: Option<Vec<u8>>,
50}
51
52impl<S: Db> Runtime<S> {
53    pub fn new(storage: &S, contract_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
54        let start = Instant::now();
55        // We create all necessary dub-databases, in case they don't exist
56        let _ = storage.create_sub_db(CONTRACT_SUB_DB)?;
57        let _ = storage.create_sub_db(ACTION_TX_REL_SUB_DB)?;
58        let _ = storage.create_sub_db(LEDGER_SUB_DB)?;
59        let _ = storage.create_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
60
61        // Generate engine ( without async support )
62        let mut config = Config::new();
63        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
64        config.async_support(false);
65        let engine = Engine::new(&config)?;
66
67        let mut linker: Linker<VmState<S>> = Linker::new(&engine);
68
69        // NOTE: We have to wrap the functions into a closure here, because they must be monomorphized
70        // (as a generic function cannot be made into a function pointer)
71        linker.func_wrap(
72            "env",
73            "print",
74            |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
75        )?;
76        // -- Register-API
77        linker.func_wrap(
78            "env",
79            "read_register",
80            |caller: Caller<'_, VmState<S>>, register_id, ptr| {
81                vm::read_register(caller, register_id, ptr)
82            },
83        )?;
84        linker.func_wrap(
85            "env",
86            "register_len",
87            |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
88        )?;
89        linker.func_wrap(
90            "env",
91            "write_register",
92            |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
93                vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
94            },
95        )?;
96        // -- Storage-API
97        linker.func_wrap(
98            "env",
99            "storage_read",
100            |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
101                vm::storage_read(caller, base_key, sub_key, register_id)
102            },
103        )?;
104        linker.func_wrap(
105            "env",
106            "storage_write",
107            |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
108                vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
109            },
110        )?;
111        linker.func_wrap(
112            "env",
113            "storage_remove",
114            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
115                vm::storage_remove(caller, base_key, sub_key)
116            },
117        )?;
118        linker.func_wrap(
119            "env",
120            "storage_has_key",
121            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
122                vm::storage_has_key(caller, base_key, sub_key)
123            },
124        )?;
125        linker.func_wrap(
126            "env",
127            "storage_cursor",
128            |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
129        )?;
130        linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
131
132        // NOTE: The timestamp uses the timestamp from the block-ctx, so no side-effect here
133        linker.func_wrap("env", "timestamp", |caller: Caller<'_, VmState<S>>| {
134            vm::timestamp(caller)
135        })?;
136
137        // -- Ledger-API
138        linker.func_wrap(
139            "env",
140            "create_ledger_entry",
141            |caller: Caller<'_, VmState<S>>, wasm_ptr, wasm_len| {
142                vm::create_ledger_entry(caller, wasm_ptr, wasm_len)
143            },
144        )?;
145
146        // NOTE: Those functions introduce side-effects;
147        // they should only be used by us or during development of a contract
148        linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
149            vm::tic(caller)
150        })?;
151        linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
152            vm::toc(caller)
153        })?;
154        linker.func_wrap("env", "rand", vm::rand)?;
155
156        info!("Initialized runtime in: {:?}", start.elapsed());
157
158        Ok(Self {
159            linker,
160            engine,
161            contract_store,
162            mutability_lock: lock,
163            block_ctx: None,
164            executor: None,
165        })
166    }
167
168    pub fn into_shared(self) -> Arc<Mutex<Self>> {
169        Arc::new(Mutex::new(self))
170    }
171
172    /// Creates a new instance of the wasm module in our [`CodeStore`] for the given contract-id
173    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%contract_id), err))]
174    pub fn instantiate_contract(
175        &mut self,
176        contract_id: ContractId,
177        module_bytes: &[u8],
178    ) -> Result<()> {
179        let module = Module::new(&self.engine, module_bytes)?;
180        check_module(&self.engine, &module)?;
181        self.contract_store.insert_contract(contract_id, module)?;
182        Ok(())
183    }
184
185    /// Sets the currently active block
186    ///
187    /// This buffers the encoded [`BlockCtx`], to later write it to the dedicated register, so that the wasm side can query it.
188    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%block_id), err))]
189    pub fn set_block(&mut self, block_id: BlockIdentifier, block_timestamp: u64) -> Result<()> {
190        let ctx = BlockCtx {
191            block_id,
192            timestamp: block_timestamp,
193        };
194        self.block_ctx = Some(ctx.to_bytes()?);
195        Ok(())
196    }
197
198    /// Sanity check for introductions
199    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, err))]
200    pub fn check_module_and_state(
201        &mut self,
202        module_bytes: Vec<u8>,
203        state: serde_json::Value,
204    ) -> Result<(bool, Vec<String>)> {
205        let module = Module::new(&self.engine, module_bytes)?;
206        check_module(&self.engine, &module)?;
207        let mut store = self.contract_store.create_store(&self.engine)?;
208        let instance = self.linker.instantiate(&mut store, &module)?;
209
210        // Prepare registers
211        store
212            .data_mut()
213            .set_register(REGISTER_INPUT, state.to_string().into_bytes());
214
215        // Call the actual function on the wasm side
216        store.data_mut().prepare_exec(ActiveEntity::None)?;
217        let success = match instance
218            .get_typed_func::<(), ()>(&mut store, "parse_state")
219            .and_then(|func| func.call(&mut store, ()))
220        {
221            Ok(()) => true,
222            Err(_e) => false,
223        };
224        let log = store.data_mut().finish_exec(None)?;
225        Ok((success, log.into_iter().map(|l| l.msg).collect()))
226    }
227
228    /// Sets the currently active executor
229    ///
230    /// This buffers the [`BorderlessId`] of the executor, to later write it into the dedicated register,
231    /// so that the wasm side can query it.
232    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
233    pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
234        let bytes = executor_id.into_bytes().to_vec();
235        self.executor = Some(bytes);
236        Ok(())
237    }
238
239    #[must_use = "You have to handle the output events of this function"]
240    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
241    pub fn process_transaction(
242        &mut self,
243        cid: &ContractId,
244        action: CallAction,
245        writer: &BorderlessId,
246        tx_ctx: TxCtx,
247    ) -> Result<Option<Events>> {
248        let input = action.to_bytes()?;
249        let events =
250            self.process_chain_tx(*cid, input, *writer, tx_ctx, Some(Commit::Action(action)))?;
251        Ok(events)
252    }
253
254    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %introduction.id, %writer), err))]
255    pub fn process_introduction(
256        &mut self,
257        introduction: Introduction,
258        writer: &BorderlessId,
259        tx_ctx: TxCtx,
260    ) -> Result<()> {
261        let cid = match introduction.id {
262            borderless::prelude::Id::Contract { contract_id } => contract_id,
263            borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
264        };
265        // NOTE: The input for the introduction is not the introduction, but only the initial state!
266        // The introduction itself is commited by the VmState
267        let initial_state = introduction.initial_state.to_string().into_bytes();
268        self.process_chain_tx(
269            cid,
270            initial_state,
271            *writer,
272            tx_ctx,
273            Some(Commit::Introduction(introduction)),
274        )?;
275        Ok(())
276    }
277
278    // TODO: Calling process introduction on an already revoked contract should generate an error
279    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %revocation.id, %writer), err))]
280    pub fn process_revocation(
281        &mut self,
282        revocation: Revocation,
283        writer: &BorderlessId,
284        tx_ctx: TxCtx,
285    ) -> Result<()> {
286        let input = revocation.to_bytes()?;
287        let cid = match revocation.id {
288            borderless::prelude::Id::Contract { contract_id } => contract_id,
289            borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
290        };
291        self.process_chain_tx(
292            cid,
293            input,
294            *writer,
295            tx_ctx,
296            Some(Commit::Revocation(revocation)),
297        )?;
298        Ok(())
299    }
300
301    // TODO: Return Option<Events> to have None or use Events::default() ?
302    /// Abstraction over all possible chain transactions
303    ///
304    /// In case of an error, the `VmState` is reset by this function.
305    fn process_chain_tx(
306        &mut self,
307        cid: ContractId,
308        input: Vec<u8>,
309        writer: BorderlessId,
310        tx_ctx: TxCtx,
311        commit: Option<Commit>,
312    ) -> Result<Option<Events>> {
313        let tx_ctx_bytes = tx_ctx.to_bytes()?;
314        let (instance, mut store) = self
315            .contract_store
316            .get_contract(&cid, &self.engine, &mut self.linker)?
317            .ok_or_else(|| ErrorKind::MissingContract { cid })?;
318
319        let mtx = self.mutability_lock.get_lock(&cid);
320        let _guard = mtx.lock();
321
322        let contract_method = match &commit {
323            Some(Commit::Action(_)) => "process_transaction",
324            Some(Commit::Introduction(_)) => "process_introduction",
325            Some(Commit::Revocation(_)) => "process_revocation",
326            Some(Commit::Other) => panic!("Commit::Other is reserved for actions"),
327            None => "process_transaction", // NOTE: None is used for dry-runs of transactions
328        };
329
330        // Prepare registers
331        store.data_mut().set_register(REGISTER_INPUT, input);
332        store.data_mut().set_register(REGISTER_TX_CTX, tx_ctx_bytes);
333        store
334            .data_mut()
335            .set_register(REGISTER_WRITER, writer.into_bytes().into());
336
337        // Buffered registers
338        store.data_mut().set_register(
339            REGISTER_BLOCK_CTX,
340            self.block_ctx.clone().unwrap_or_default(),
341        );
342        store
343            .data_mut()
344            .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
345
346        // Call the actual function on the wasm side
347        store
348            .data_mut()
349            .prepare_exec(ActiveEntity::contract_tx(cid, true, tx_ctx))?;
350        let commit = match instance
351            .get_typed_func::<(), ()>(&mut store, contract_method)
352            .and_then(|func| func.call(&mut store, ()))
353        {
354            Ok(()) => {
355                // We commit it the way that we are told to
356                commit
357            }
358            Err(e) => {
359                warn!("{contract_method} failed with error: {e}");
360                // In this case we do not want to commit, so set it to `None`
361                None
362            }
363        };
364        let output = store.data().get_register(REGISTER_OUTPUT);
365        let _log = store.data_mut().finish_exec(commit);
366
367        // Return output events
368        match output {
369            Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
370            None => Ok(None),
371        }
372    }
373
374    /// Executes an action without commiting the state
375    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
376    pub fn perform_dry_run(
377        &mut self,
378        cid: &ContractId,
379        action: &CallAction,
380        writer: &BorderlessId,
381    ) -> Result<()> {
382        let input = action.to_bytes()?;
383
384        // TODO: Maybe do this a little bit more elaborate,
385        // and actually create a tx-ctx + block-ctx that are "bigger" than the last-applied tx.
386        // Otherwise we might end up with weird effects - as the users can use the timestamps inside their contracts
387        let tx_ctx = TxCtx::dummy();
388        let block_ctx = BlockCtx::dummy();
389        self.set_block(block_ctx.block_id, block_ctx.timestamp)?;
390        let _out = self.process_chain_tx(*cid, input, *writer, tx_ctx, None)?;
391        Ok(())
392    }
393
394    // --- NOTE: Maybe we should create a separate runtime for the HTTP handling ?
395
396    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
397    pub fn http_get_state(&mut self, cid: &ContractId, path: String) -> Result<(u16, Vec<u8>)> {
398        let (status, result) = self.process_http_call(cid, path, None, None, "http_get_state")?;
399        Ok((status, result))
400    }
401
402    /// Uses a POST request to parse and generate a [`CallAction`] object.
403    ///
404    /// The return type is a nested result. The outer result type should convert to a server error,
405    /// as it represents errors in the runtime itself.
406    /// The inner error type comes from the wasm code and contains the error status and message.
407    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
408    pub fn http_post_action(
409        &mut self,
410        cid: &ContractId,
411        path: String,
412        payload: Vec<u8>,
413        writer: &BorderlessId,
414    ) -> Result<std::result::Result<CallAction, (u16, String)>> {
415        let (status, result) =
416            self.process_http_call(cid, path, Some(payload), Some(writer), "http_post_action")?;
417        if status == 200 {
418            let action =
419                CallAction::from_bytes(&result).map_err(|_| ErrorKind::InvalidRegisterValue {
420                    register: "http-result",
421                    expected_type: "CallAction",
422                })?;
423            Ok(Ok(action))
424        } else {
425            let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
426                register: "http-result",
427                expected_type: "string",
428            })?;
429            Ok(Err((status, error)))
430        }
431    }
432
433    fn process_http_call(
434        &mut self,
435        cid: &ContractId,
436        path: String,
437        payload: Option<Vec<u8>>,
438        writer: Option<&BorderlessId>,
439        http_method: &'static str,
440    ) -> Result<(u16, Vec<u8>)> {
441        let (instance, mut store) = self
442            .contract_store
443            .get_contract(cid, &self.engine, &mut self.linker)?
444            .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
445
446        // Set registers
447        store
448            .data_mut()
449            .prepare_exec(ActiveEntity::contract_http(*cid))?;
450
451        store
452            .data_mut()
453            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
454
455        if let Some(payload) = payload {
456            store
457                .data_mut()
458                .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
459        }
460
461        if let Some(writer) = writer {
462            store
463                .data_mut()
464                .set_register(REGISTER_WRITER, writer.into_bytes().into());
465        }
466
467        // Buffered registers
468        store.data_mut().set_register(
469            REGISTER_BLOCK_CTX,
470            self.block_ctx.clone().unwrap_or_default(),
471        );
472        store
473            .data_mut()
474            .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
475
476        if let Err(e) = instance
477            .get_typed_func::<(), ()>(&mut store, http_method)
478            .and_then(|func| func.call(&mut store, ()))
479        {
480            error!("{http_method} failed with error: {e}");
481        }
482        // Get output
483        let status = store.data().get_register(REGISTER_OUTPUT_HTTP_STATUS);
484
485        let result = store.data().get_register(REGISTER_OUTPUT_HTTP_RESULT);
486
487        // Finish the execution ( and commit nothing )
488        let _log = store.data_mut().finish_exec(None)?;
489
490        // Parse status
491        let status = status.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
492        let status_bytes = status
493            .try_into()
494            .map_err(|_| ErrorKind::InvalidRegisterValue {
495                register: "http-status",
496                expected_type: "u16",
497            })?;
498        let status = u16::from_be_bytes(status_bytes);
499
500        let result = result.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
501        Ok((status, result))
502    }
503
504    /// Returns the symbols of the contract
505    pub fn get_symbols(&mut self, cid: &ContractId) -> Result<Option<Symbols>> {
506        let (instance, mut store) = self
507            .contract_store
508            .get_contract(cid, &self.engine, &mut self.linker)?
509            .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
510
511        store.data_mut().prepare_exec(ActiveEntity::None)?;
512
513        // In case the contract does not export any symbols, just return 'None'
514        if let Err(e) = instance
515            .get_typed_func::<(), ()>(&mut store, "get_symbols")
516            .and_then(|func| func.call(&mut store, ()))
517        {
518            error!("get_symbols failed with error: {e}");
519        }
520        let output = store.data().get_register(REGISTER_OUTPUT);
521        store.data_mut().finish_exec(None)?;
522
523        let bytes = match output {
524            Some(b) => b,
525            None => return Ok(None),
526        };
527        let symbols = Symbols::from_bytes(&bytes)?;
528        Ok(Some(symbols))
529    }
530
531    pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
532        self.contract_store.available_contracts()
533    }
534
535    /// Returns a copy of the underlying db handle
536    pub fn get_db(&self) -> S {
537        self.contract_store.get_db()
538    }
539}
540
541type Lock = Arc<Mutex<()>>;
542
543/// Global mutability lock for all contracts
544///
545/// Since we can only allow one mutable contract execution at a given time, we need a mechanism to ensure that.
546/// The `MutLock` ensures this on a per-contract basis. It holds `RwLock`s for all agents and provides threadsafe access.
547///
548/// The logic is similar but not identical to rusts ownership rules. While there can be only one read-write (mutable) execution,
549/// there can be multiple read-only (immutable) executions even if there is an ongoing read-write execution !
550/// The reason behind this is basically that read-only executions do not produce storage operations that would change the state in the database.
551/// In the `VmState`, all write operations are buffered until the execution is finished. If there would be two executions in parallel,
552/// we might end up commiting changes to a state, that has already changed under the hood - which is not what we want.
553/// However, if there is a writer thread, the readers do not care, and also the writer does not care about the readers.
554/// The readers will use the old state, until the new one is commited by the runtime.
555///
556/// Note: In contrast to [`borderless_runtime::rt::agent::MutLock`], this version uses only synchronous lock primitives.
557#[derive(Clone, Default)]
558pub struct MutLock {
559    map: Arc<Mutex<HashMap<ContractId, Lock>>>,
560}
561
562impl MutLock {
563    /// Returns the `RwLock` for the given contract.
564    ///
565    /// If the contract-id is unknown, a new lock is created.
566    pub fn get_lock(&self, cid: &ContractId) -> Lock {
567        let mut map = self.map.lock();
568        let lock = map.entry(*cid).or_default();
569        lock.clone()
570    }
571}
572
573fn check_module(engine: &Engine, module: &Module) -> Result<()> {
574    let functions = [
575        "process_transaction",
576        "process_introduction",
577        "process_revocation",
578        "http_get_state",
579        "http_post_action",
580        "parse_state",
581        "get_symbols",
582    ];
583    for func in functions {
584        let exp = module
585            .get_export(func)
586            .ok_or_else(|| ErrorKind::MissingExport { func })?;
587        if let ExternType::Func(func_type) = exp {
588            if !func_type.matches(&FuncType::new(engine, [], [])) {
589                return Err(ErrorKind::InvalidFuncType { func }.into());
590            }
591        } else {
592            return Err(ErrorKind::InvalidExport { func }.into());
593        }
594    }
595    Ok(())
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    const ALL_EXPORTS: &str = r#"
603(module
604  ;; Declare the function `placeholder`
605  (func $placeholder)
606
607  ;; Export the functions so they can be called from outside the module
608  (export "process_transaction" (func $placeholder))
609  (export "process_introduction" (func $placeholder))
610  (export "process_revocation" (func $placeholder))
611  (export "http_get_state" (func $placeholder))
612  (export "http_post_action" (func $placeholder))
613  (export "parse_state" (func $placeholder))
614  (export "get_symbols" (func $placeholder))
615)
616"#;
617    fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
618        // Create a new Vec to hold the processed lines
619        let mut new_lines = Vec::new();
620
621        for line in original.lines() {
622            // Check if the line contains the pattern
623            if !line.contains(pattern) {
624                // Otherwise, push the original line
625                new_lines.push(line);
626            }
627        }
628
629        // Collect the lines back into a single string
630        new_lines.join("\n")
631    }
632
633    #[test]
634    fn missing_exports() {
635        let mut config = Config::new();
636        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
637        config.async_support(false);
638        let engine = Engine::new(&config).unwrap();
639
640        // These are the functions, that must not be missing
641        let functions = [
642            "process_transaction",
643            "process_introduction",
644            "process_revocation",
645            "http_get_state",
646            "http_post_action",
647            "parse_state",
648            "get_symbols",
649        ];
650        for func in functions {
651            let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
652            let module = Module::new(&engine, &wat_missing);
653            assert!(module.is_ok());
654            let err = check_module(&engine, &module.unwrap());
655            assert!(err.is_err());
656        }
657        let module = Module::new(&engine, &ALL_EXPORTS);
658        assert!(module.is_ok());
659
660        let err = check_module(&engine, &module.unwrap());
661        assert!(err.is_ok());
662    }
663}