borderless_runtime/rt/
contract.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::common::{Introduction, Revocation, Symbols};
7use borderless::contracts::{BlockCtx, TxCtx};
8use borderless::events::Events;
9use borderless::{events::CallAction, ContractId};
10use borderless::{BlockIdentifier, BorderlessId};
11use borderless_kv_store::backend::lmdb::Lmdb;
12use borderless_kv_store::Db;
13use parking_lot::Mutex;
14use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module, Store};
15
16use super::{
17    code_store::CodeStore,
18    vm::{self, ContractCommit, VmState},
19};
20use crate::db::{
21    action_log::ActionRecord,
22    logger::{self, print_log_line},
23};
24use crate::log_shim::*;
25use crate::ACTION_TX_REL_SUB_DB;
26use crate::{
27    error::{ErrorKind, Result},
28    CONTRACT_SUB_DB,
29};
30
31pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
32
33/*
34 * Runtime TODO's:
35 * - use one global engine for all runtimes
36 * - make the Store a short lived object
37 * - use per-runtime caching (as an Instance is bound to the Store)
38 * - invalidate the cache, when re-creating the store
39 *
40 */
41
42pub struct Runtime<S = Lmdb>
43where
44    S: Db,
45{
46    linker: Linker<VmState<S>>,
47    store: Store<VmState<S>>,
48    engine: Engine,
49    contract_store: CodeStore<S>,
50    mutability_lock: MutLock,
51}
52
53impl<S: Db> Runtime<S> {
54    pub fn new(storage: &S, contract_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
55        let db_ptr = storage.create_sub_db(CONTRACT_SUB_DB)?;
56        let _ = storage.create_sub_db(ACTION_TX_REL_SUB_DB)?; // Also create the action relation db here
57        let start = Instant::now();
58        let state = VmState::new(storage.clone(), db_ptr);
59
60        let mut config = Config::new();
61        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
62        config.async_support(false);
63        let engine = Engine::new(&config)?;
64        // let module = Module::from_file(&engine, contract_path)?;
65
66        let mut linker: Linker<VmState<S>> = Linker::new(&engine);
67
68        // NOTE: We have to wrap the functions into a closure here, because they must be monomorphized
69        // (as a generic function cannot be made into a function pointer)
70        linker.func_wrap(
71            "env",
72            "print",
73            |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
74        )?;
75        linker.func_wrap(
76            "env",
77            "read_register",
78            |caller: Caller<'_, VmState<S>>, register_id, ptr| {
79                vm::read_register(caller, register_id, ptr)
80            },
81        )?;
82        linker.func_wrap(
83            "env",
84            "register_len",
85            |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
86        )?;
87        linker.func_wrap(
88            "env",
89            "write_register",
90            |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
91                vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
92            },
93        )?;
94        linker.func_wrap(
95            "env",
96            "storage_read",
97            |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
98                vm::storage_read(caller, base_key, sub_key, register_id)
99            },
100        )?;
101        linker.func_wrap(
102            "env",
103            "storage_write",
104            |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
105                vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
106            },
107        )?;
108        linker.func_wrap(
109            "env",
110            "storage_remove",
111            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
112                vm::storage_remove(caller, base_key, sub_key)
113            },
114        )?;
115        linker.func_wrap(
116            "env",
117            "storage_has_key",
118            |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
119                vm::storage_has_key(caller, base_key, sub_key)
120            },
121        )?;
122        linker.func_wrap(
123            "env",
124            "storage_cursor",
125            |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
126        )?;
127
128        // NOTE: Those functions introduce side-effects;
129        // they should only be used by us or during development of a contract
130        linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
131        linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
132            vm::tic(caller)
133        })?;
134        linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
135            vm::toc(caller)
136        })?;
137        linker.func_wrap("env", "rand", vm::rand)?;
138
139        let store = Store::new(&engine, state);
140
141        info!("Initialized runtime in: {:?}", start.elapsed());
142
143        Ok(Self {
144            linker,
145            store,
146            engine,
147            contract_store,
148            mutability_lock: lock,
149        })
150    }
151
152    pub fn into_shared(self) -> Arc<Mutex<Self>> {
153        Arc::new(Mutex::new(self))
154    }
155
156    /// Creates a new instance of the wasm module in our [`CodeStore`] for the given contract-id
157    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%contract_id), err))]
158    pub fn instantiate_contract(
159        &mut self,
160        contract_id: ContractId,
161        module_bytes: &[u8],
162    ) -> Result<()> {
163        let module = Module::new(&self.engine, module_bytes)?;
164        check_module(&self.engine, &module)?;
165        self.contract_store.insert_contract(contract_id, module)?;
166        Ok(())
167    }
168
169    /// Sets the currently active block
170    ///
171    /// This writes the [`BlockCtx`] to the dedicated register, so that the wasm side can query it.
172    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%block_id), err))]
173    pub fn set_block(&mut self, block_id: BlockIdentifier, block_timestamp: u64) -> Result<()> {
174        let ctx = BlockCtx {
175            block_id,
176            timestamp: block_timestamp,
177        };
178        self.store
179            .data_mut()
180            .set_register(REGISTER_BLOCK_CTX, ctx.to_bytes()?);
181        Ok(())
182    }
183
184    /// Sets the currently active executor
185    ///
186    /// This writes the [`BorderlessId`] of the executor to the dedicated register, so that the wasm side can query it.
187    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
188    pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
189        let bytes = executor_id.into_bytes().to_vec();
190        self.store.data_mut().set_register(REGISTER_EXECUTOR, bytes);
191        Ok(())
192    }
193
194    #[must_use = "You have to handle the output events of this function"]
195    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
196    pub fn process_transaction(
197        &mut self,
198        cid: &ContractId,
199        action: CallAction,
200        writer: &BorderlessId,
201        tx_ctx: TxCtx,
202    ) -> Result<Option<Events>> {
203        let input = action.to_bytes()?;
204        let events = self.process_chain_tx(
205            *cid,
206            input,
207            *writer,
208            tx_ctx.to_bytes()?,
209            ContractCommit::Action { action, tx_ctx },
210        )?;
211        Ok(events)
212    }
213
214    // TODO: Calling process introduction on an already introduced contract should generate an error
215    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %introduction.id, %writer), err))]
216    pub fn process_introduction(
217        &mut self,
218        introduction: Introduction,
219        writer: &BorderlessId,
220        tx_ctx: TxCtx,
221    ) -> Result<()> {
222        let cid = match introduction.id {
223            borderless::prelude::Id::Contract { contract_id } => contract_id,
224            borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
225        };
226        // NOTE: The input for the introduction is not the introduction, but only the initial state!
227        // The introduction itself is commited by the VmState
228        let initial_state = introduction.initial_state.to_string().into_bytes();
229        self.process_chain_tx(
230            cid,
231            initial_state,
232            *writer,
233            tx_ctx.to_bytes()?,
234            ContractCommit::Introduction {
235                introduction,
236                tx_ctx,
237            },
238        )?;
239        Ok(())
240    }
241
242    // TODO: Calling process introduction on an already revoked contract should generate an error
243    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %revocation.id, %writer), err))]
244    pub fn process_revocation(
245        &mut self,
246        revocation: Revocation,
247        writer: &BorderlessId,
248        tx_ctx: TxCtx,
249    ) -> Result<()> {
250        let input = revocation.to_bytes()?;
251        let cid = match revocation.id {
252            borderless::prelude::Id::Contract { contract_id } => contract_id,
253            borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
254        };
255        self.process_chain_tx(
256            cid,
257            input,
258            *writer,
259            tx_ctx.to_bytes()?,
260            ContractCommit::Revocation { revocation, tx_ctx },
261        )?;
262        Ok(())
263    }
264
265    // TODO: Return Option<Events> to have None or use Events::default() ?
266    /// Abstraction over all possible chain transactions
267    ///
268    /// In case of an error, the `VmState` is reset by this function.
269    fn process_chain_tx(
270        &mut self,
271        cid: ContractId,
272        input: Vec<u8>,
273        writer: BorderlessId,
274        tx_ctx_bytes: Vec<u8>,
275        commit: ContractCommit,
276    ) -> Result<Option<Events>> {
277        let instance = self
278            .contract_store
279            .get_contract(&cid, &self.engine, &mut self.store, &mut self.linker)?
280            .ok_or_else(|| ErrorKind::MissingContract { cid })?;
281
282        let mtx = self.mutability_lock.get_lock(&cid);
283        let _guard = mtx.lock();
284
285        let contract_method = match &commit {
286            ContractCommit::Action { .. } => "process_transaction",
287            ContractCommit::Introduction { .. } => "process_introduction",
288            ContractCommit::Revocation { .. } => "process_revocation",
289        };
290
291        // Prepare registers
292        self.store.data_mut().set_register(REGISTER_INPUT, input);
293        self.store
294            .data_mut()
295            .set_register(REGISTER_TX_CTX, tx_ctx_bytes);
296        self.store
297            .data_mut()
298            .set_register(REGISTER_WRITER, writer.into_bytes().into());
299
300        // Call the actual function on the wasm side
301        self.store.data_mut().begin_mutable_exec(cid)?;
302        match instance
303            .get_typed_func::<(), ()>(&mut self.store, contract_method)
304            .and_then(|func| func.call(&mut self.store, ()))
305        {
306            Ok(()) => self.store.data_mut().finish_mutable_exec(commit)?,
307            Err(e) => {
308                warn!("{contract_method} failed with error: {e}");
309                // NOTE: It is okay to abort the execution here with the finish_immutable_exec function,
310                // because we only get here, if the wasm execution has failed. Therefore there are no
311                // logs or actions to be commited to the database. We simply need this line to 'reset' the VmState for the next execution.
312                let logs = self.store.data_mut().finish_immutable_exec()?;
313                for l in logs {
314                    print_log_line(l);
315                }
316            }
317        }
318
319        // Return output events
320        match self.store.data().get_register(REGISTER_OUTPUT) {
321            Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
322            None => Ok(None),
323        }
324    }
325
326    /// Executes an action without commiting the state
327    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
328    pub fn perform_dry_run(
329        &mut self,
330        cid: &ContractId,
331        action: &CallAction,
332        writer: &BorderlessId,
333    ) -> Result<()> {
334        let input = action.to_bytes()?;
335        let tx_ctx = TxCtx::dummy().to_bytes()?;
336
337        let instance = self
338            .contract_store
339            .get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
340            .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
341
342        self.store.data_mut().begin_immutable_exec(*cid)?;
343
344        // Prepare registers
345        self.store.data_mut().set_register(REGISTER_INPUT, input);
346        self.store.data_mut().set_register(REGISTER_TX_CTX, tx_ctx);
347        self.store
348            .data_mut()
349            .set_register(REGISTER_WRITER, writer.into_bytes().into());
350
351        // Call the actual function on the wasm side
352        if let Err(e) = instance
353            .get_typed_func::<(), ()>(&mut self.store, "process_transaction")
354            .and_then(|func| func.call(&mut self.store, ()))
355        {
356            warn!("dry-run of process_transaction failed with error: {e}");
357        }
358        // Finish the execution
359        self.store.data_mut().finish_immutable_exec()?;
360        Ok(())
361    }
362
363    // --- NOTE: Maybe we should create a separate runtime for the HTTP handling ?
364
365    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
366    pub fn http_get_state(&mut self, cid: &ContractId, path: String) -> Result<(u16, Vec<u8>)> {
367        let instance = self
368            .contract_store
369            .get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
370            .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
371
372        self.store.data_mut().begin_immutable_exec(*cid)?;
373
374        self.store
375            .data_mut()
376            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
377
378        if let Err(e) = instance
379            .get_typed_func::<(), ()>(&mut self.store, "http_get_state")
380            .and_then(|func| func.call(&mut self.store, ()))
381        {
382            warn!("http_get_state failed with error: {e}");
383        }
384        // Finish the execution
385        let log = self.store.data_mut().finish_immutable_exec()?;
386
387        let status = self
388            .store
389            .data()
390            .get_register(REGISTER_OUTPUT_HTTP_STATUS)
391            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
392        let status = u16::from_be_bytes(status.try_into().unwrap());
393
394        let result = self
395            .store
396            .data()
397            .get_register(REGISTER_OUTPUT_HTTP_RESULT)
398            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
399
400        // Print the log
401        for l in log {
402            logger::print_log_line(l);
403        }
404        Ok((status, result))
405    }
406
407    /// Uses a POST request to parse and generate a [`CallAction`] object.
408    ///
409    /// The return type is a nested result. The outer result type should convert to a server error,
410    /// as it represents errors in the runtime itself.
411    /// The inner error type comes from the wasm code and contains the error status and message.
412    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
413    pub fn http_post_action(
414        &mut self,
415        cid: &ContractId,
416        path: String,
417        payload: Vec<u8>,
418        writer: &BorderlessId,
419    ) -> Result<std::result::Result<CallAction, (u16, String)>> {
420        let instance = self
421            .contract_store
422            .get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
423            .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
424
425        self.store.data_mut().begin_immutable_exec(*cid)?;
426
427        self.store
428            .data_mut()
429            .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
430
431        self.store
432            .data_mut()
433            .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
434
435        self.store
436            .data_mut()
437            .set_register(REGISTER_WRITER, writer.into_bytes().into());
438
439        if let Err(e) = instance
440            .get_typed_func::<(), ()>(&mut self.store, "http_post_action")
441            .and_then(|func| func.call(&mut self.store, ()))
442        {
443            error!("http_post_action failed with error: {e}");
444        }
445        // Finish the execution
446        let log = self.store.data_mut().finish_immutable_exec()?;
447
448        let status = self
449            .store
450            .data()
451            .get_register(REGISTER_OUTPUT_HTTP_STATUS)
452            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
453        let status = u16::from_be_bytes(status.try_into().unwrap());
454
455        let result = self
456            .store
457            .data()
458            .get_register(REGISTER_OUTPUT_HTTP_RESULT)
459            .ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
460
461        // Print the log
462        for l in log {
463            logger::print_log_line(l);
464        }
465
466        if status == 200 {
467            let action = CallAction::from_bytes(&result)?;
468            Ok(Ok(action))
469        } else {
470            let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
471                register: "http-result",
472                expected_type: "string",
473            })?;
474            Ok(Err((status, error)))
475        }
476    }
477
478    /// Returns the symbols of the contract
479    pub fn get_symbols(&mut self, cid: &ContractId) -> Result<Option<Symbols>> {
480        let instance = self
481            .contract_store
482            .get_contract(cid, &self.engine, &mut self.store, &mut self.linker)?
483            .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
484
485        self.store.data_mut().begin_immutable_exec(*cid)?;
486
487        // In case the contract does not export any symbols, just return 'None'
488        if let Err(e) = instance
489            .get_typed_func::<(), ()>(&mut self.store, "get_symbols")
490            .and_then(|func| func.call(&mut self.store, ()))
491        {
492            error!("get_symbols failed with error: {e}");
493        }
494        self.store.data_mut().finish_immutable_exec()?;
495
496        let bytes = match self.store.data().get_register(REGISTER_OUTPUT) {
497            Some(b) => b,
498            None => return Ok(None),
499        };
500        let symbols = Symbols::from_bytes(&bytes)?;
501        Ok(Some(symbols))
502    }
503
504    pub fn read_action(&self, cid: &ContractId, idx: usize) -> Result<Option<ActionRecord>> {
505        self.store.data().read_action(cid, idx)
506    }
507
508    pub fn len_actions(&self, cid: &ContractId) -> Result<u64> {
509        self.store.data().len_actions(cid)
510    }
511
512    pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
513        self.contract_store.available_contracts()
514    }
515}
516
517type Lock = Arc<Mutex<()>>;
518
519/// Global mutability lock for all contracts
520///
521/// Since we can only allow one mutable contract execution at a given time, we need a mechanism to ensure that.
522/// The `MutLock` ensures this on a per-contract basis. It holds `RwLock`s for all agents and provides threadsafe access.
523///
524/// The logic is similar but not identical to rusts ownership rules. While there can be only one read-write (mutable) execution,
525/// there can be multiple read-only (immutable) executions even if there is an ongoing read-write execution !
526/// The reason behind this is basically that read-only executions do not produce storage operations that would change the state in the database.
527/// In the `VmState`, all write operations are buffered until the execution is finished. If there would be two executions in parallel,
528/// we might end up commiting changes to a state, that has already changed under the hood - which is not what we want.
529/// However, if there is a writer thread, the readers do not care, and also the writer does not care about the readers.
530/// The readers will use the old state, until the new one is commited by the runtime.
531///
532/// Note: In contrast to [`borderless_runtime::rt::agent::MutLock`], this version uses only synchronous lock primitives.
533#[derive(Clone, Default)]
534pub struct MutLock {
535    map: Arc<Mutex<HashMap<ContractId, Lock>>>,
536}
537
538impl MutLock {
539    /// Returns the `RwLock` for the given contract.
540    ///
541    /// If the contract-id is unknown, a new lock is created.
542    pub fn get_lock(&self, cid: &ContractId) -> Lock {
543        let mut map = self.map.lock();
544        let lock = map.entry(*cid).or_default();
545        lock.clone()
546    }
547}
548
549fn check_module(engine: &Engine, module: &Module) -> Result<()> {
550    let functions = [
551        "process_transaction",
552        "process_introduction",
553        "process_revocation",
554        "http_get_state",
555        "http_post_action",
556        "get_symbols",
557    ];
558    for func in functions {
559        let exp = module
560            .get_export(func)
561            .ok_or_else(|| ErrorKind::MissingExport { func })?;
562        if let ExternType::Func(func_type) = exp {
563            if !func_type.matches(&FuncType::new(engine, [], [])) {
564                return Err(ErrorKind::InvalidFuncType { func }.into());
565            }
566        } else {
567            return Err(ErrorKind::InvalidExport { func }.into());
568        }
569    }
570    Ok(())
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576
577    const ALL_EXPORTS: &str = r#"
578(module
579  ;; Declare the function `placeholder`
580  (func $placeholder)
581
582  ;; Export the functions so they can be called from outside the module
583  (export "process_transaction" (func $placeholder))
584  (export "process_introduction" (func $placeholder))
585  (export "process_revocation" (func $placeholder))
586  (export "http_get_state" (func $placeholder))
587  (export "http_post_action" (func $placeholder))
588  (export "get_symbols" (func $placeholder))
589)
590"#;
591    fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
592        // Create a new Vec to hold the processed lines
593        let mut new_lines = Vec::new();
594
595        for line in original.lines() {
596            // Check if the line contains the pattern
597            if !line.contains(pattern) {
598                // Otherwise, push the original line
599                new_lines.push(line);
600            }
601        }
602
603        // Collect the lines back into a single string
604        new_lines.join("\n")
605    }
606
607    #[test]
608    fn missing_exports() {
609        let mut config = Config::new();
610        config.cranelift_opt_level(wasmtime::OptLevel::Speed);
611        config.async_support(false);
612        let engine = Engine::new(&config).unwrap();
613
614        // These are the functions, that must not be missing
615        let functions = [
616            "process_transaction",
617            "process_introduction",
618            "process_revocation",
619            "http_get_state",
620            "http_post_action",
621            "get_symbols",
622        ];
623        for func in functions {
624            let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
625            let module = Module::new(&engine, &wat_missing);
626            assert!(module.is_ok());
627            let err = check_module(&engine, &module.unwrap());
628            assert!(err.is_err());
629        }
630        let module = Module::new(&engine, &ALL_EXPORTS);
631        assert!(module.is_ok());
632
633        let err = check_module(&engine, &module.unwrap());
634        assert!(err.is_ok());
635    }
636}