1use std::sync::Arc;
2use std::time::Instant;
3
4use ahash::HashMap;
5use borderless::__private::registers::*;
6use borderless::common::{Introduction, Revocation, Symbols};
7use borderless::contracts::{BlockCtx, TxCtx};
8use borderless::events::Events;
9use borderless::{events::CallAction, ContractId};
10use borderless::{BlockIdentifier, BorderlessId};
11use borderless_kv_store::backend::lmdb::Lmdb;
12use borderless_kv_store::Db;
13use http::StatusCode;
14use parking_lot::Mutex;
15use wasmtime::{Caller, Config, Engine, ExternType, FuncType, Linker, Module};
16
17use super::vm::{ActiveEntity, Commit};
18use super::{
19 code_store::CodeStore,
20 vm::{self, VmState},
21};
22use crate::db::controller::Controller;
23use crate::{
24 error::{ErrorKind, Result},
25 CONTRACT_SUB_DB,
26};
27use crate::{log_shim::*, LEDGER_SUB_DB};
28use crate::{ACTION_TX_REL_SUB_DB, SUBSCRIPTION_REL_SUB_DB};
29
30pub type SharedRuntime<S> = Arc<Mutex<Runtime<S>>>;
31
32pub struct Runtime<S = Lmdb>
43where
44 S: Db,
45{
46 linker: Linker<VmState<S>>,
47 engine: Engine,
48 contract_store: CodeStore<S>,
49 mutability_lock: MutLock,
50 block_ctx: Option<Vec<u8>>,
51 executor: Option<Vec<u8>>,
52}
53
54impl<S: Db> Runtime<S> {
55 pub fn new(storage: &S, contract_store: CodeStore<S>, lock: MutLock) -> Result<Self> {
56 let start = Instant::now();
57 let _ = storage.create_sub_db(CONTRACT_SUB_DB)?;
59 let _ = storage.create_sub_db(ACTION_TX_REL_SUB_DB)?;
60 let _ = storage.create_sub_db(LEDGER_SUB_DB)?;
61 let _ = storage.create_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
62
63 let mut config = Config::new();
65 config.cranelift_opt_level(wasmtime::OptLevel::Speed);
66 config.async_support(false);
67 let engine = Engine::new(&config)?;
68
69 let mut linker: Linker<VmState<S>> = Linker::new(&engine);
70
71 linker.func_wrap(
74 "env",
75 "print",
76 |caller: Caller<'_, VmState<S>>, ptr, len, level| vm::print(caller, ptr, len, level),
77 )?;
78 linker.func_wrap(
80 "env",
81 "read_register",
82 |caller: Caller<'_, VmState<S>>, register_id, ptr| {
83 vm::read_register(caller, register_id, ptr)
84 },
85 )?;
86 linker.func_wrap(
87 "env",
88 "register_len",
89 |caller: Caller<'_, VmState<S>>, register_id| vm::register_len(caller, register_id),
90 )?;
91 linker.func_wrap(
92 "env",
93 "write_register",
94 |caller: Caller<'_, VmState<S>>, register_id, wasm_ptr, wasm_ptr_len| {
95 vm::write_register(caller, register_id, wasm_ptr, wasm_ptr_len)
96 },
97 )?;
98 linker.func_wrap(
100 "env",
101 "storage_read",
102 |caller: Caller<'_, VmState<S>>, base_key, sub_key, register_id| {
103 vm::storage_read(caller, base_key, sub_key, register_id)
104 },
105 )?;
106 linker.func_wrap(
107 "env",
108 "storage_write",
109 |caller: Caller<'_, VmState<S>>, base_key, sub_key, value_ptr, value_len| {
110 vm::storage_write(caller, base_key, sub_key, value_ptr, value_len)
111 },
112 )?;
113 linker.func_wrap(
114 "env",
115 "storage_remove",
116 |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
117 vm::storage_remove(caller, base_key, sub_key)
118 },
119 )?;
120 linker.func_wrap(
121 "env",
122 "storage_has_key",
123 |caller: Caller<'_, VmState<S>>, base_key, sub_key| {
124 vm::storage_has_key(caller, base_key, sub_key)
125 },
126 )?;
127 linker.func_wrap(
128 "env",
129 "storage_cursor",
130 |caller: Caller<'_, VmState<S>>, base_key| vm::storage_cursor(caller, base_key),
131 )?;
132 linker.func_wrap("env", "storage_gen_sub_key", vm::storage_gen_sub_key)?;
133
134 linker.func_wrap("env", "timestamp", |caller: Caller<'_, VmState<S>>| {
136 vm::timestamp(caller)
137 })?;
138
139 linker.func_wrap(
141 "env",
142 "create_ledger_entry",
143 |caller: Caller<'_, VmState<S>>, wasm_ptr, wasm_len| {
144 vm::create_ledger_entry(caller, wasm_ptr, wasm_len)
145 },
146 )?;
147
148 linker.func_wrap("env", "tic", |caller: Caller<'_, VmState<S>>| {
151 vm::tic(caller)
152 })?;
153 linker.func_wrap("env", "toc", |caller: Caller<'_, VmState<S>>| {
154 vm::toc(caller)
155 })?;
156 linker.func_wrap("env", "rand", vm::rand)?;
157
158 info!("Initialized runtime in: {:?}", start.elapsed());
159
160 Ok(Self {
161 linker,
162 engine,
163 contract_store,
164 mutability_lock: lock,
165 block_ctx: None,
166 executor: None,
167 })
168 }
169
170 pub fn into_shared(self) -> Arc<Mutex<Self>> {
171 Arc::new(Mutex::new(self))
172 }
173
174 pub fn contract_revoked(&self, aid: &ContractId) -> Result<bool> {
176 let db = self.get_db();
177 let controller = Controller::new(&db);
178 controller.contract_revoked(aid)
179 }
180
181 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%contract_id), err))]
183 pub fn instantiate_contract(
184 &mut self,
185 contract_id: ContractId,
186 module_bytes: &[u8],
187 ) -> Result<()> {
188 let module = Module::new(&self.engine, module_bytes)?;
189 check_module(&self.engine, &module)?;
190 self.contract_store.insert_contract(contract_id, module)?;
191 Ok(())
192 }
193
194 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%block_id), err))]
198 pub fn set_block(&mut self, block_id: BlockIdentifier, block_timestamp: u64) -> Result<()> {
199 let ctx = BlockCtx {
200 block_id,
201 timestamp: block_timestamp,
202 };
203 self.block_ctx = Some(ctx.to_bytes()?);
204 Ok(())
205 }
206
207 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, err))]
209 pub fn check_module_and_state(
210 &mut self,
211 module_bytes: Vec<u8>,
212 state: serde_json::Value,
213 ) -> Result<(bool, Vec<String>)> {
214 let module = Module::new(&self.engine, module_bytes)?;
215 check_module(&self.engine, &module)?;
216 let mut store = self.contract_store.create_store(&self.engine)?;
217 let instance = self.linker.instantiate(&mut store, &module)?;
218
219 store
221 .data_mut()
222 .set_register(REGISTER_INPUT, state.to_string().into_bytes());
223
224 store.data_mut().prepare_exec(ActiveEntity::None)?;
226 let success = match instance
227 .get_typed_func::<(), ()>(&mut store, "parse_state")
228 .and_then(|func| func.call(&mut store, ()))
229 {
230 Ok(()) => true,
231 Err(_e) => false,
232 };
233 let log = store.data_mut().finish_exec(None)?;
234 Ok((success, log.into_iter().map(|l| l.msg).collect()))
235 }
236
237 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(%executor_id), err))]
242 pub fn set_executor(&mut self, executor_id: BorderlessId) -> Result<()> {
243 let bytes = executor_id.into_bytes().to_vec();
244 self.executor = Some(bytes);
245 Ok(())
246 }
247
248 #[must_use = "You have to handle the output events of this function"]
249 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
250 pub fn process_transaction(
251 &mut self,
252 cid: &ContractId,
253 action: CallAction,
254 writer: &BorderlessId,
255 tx_ctx: TxCtx,
256 ) -> Result<Option<Events>> {
257 let input = action.to_bytes()?;
258 let events =
259 self.process_chain_tx(*cid, input, *writer, tx_ctx, Some(Commit::Action(action)))?;
260 Ok(events)
261 }
262
263 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %introduction.id, %writer), err))]
264 pub fn process_introduction(
265 &mut self,
266 introduction: Introduction,
267 writer: &BorderlessId,
268 tx_ctx: TxCtx,
269 ) -> Result<()> {
270 let cid = match introduction.id {
271 borderless::prelude::Id::Contract { contract_id } => contract_id,
272 borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
273 };
274 let initial_state = introduction.initial_state.to_string().into_bytes();
277 self.process_chain_tx(
278 cid,
279 initial_state,
280 *writer,
281 tx_ctx,
282 Some(Commit::Introduction(introduction)),
283 )?;
284 Ok(())
285 }
286
287 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %revocation.id, %writer), err))]
289 pub fn process_revocation(
290 &mut self,
291 revocation: Revocation,
292 writer: &BorderlessId,
293 tx_ctx: TxCtx,
294 ) -> Result<()> {
295 let input = revocation.to_bytes()?;
296 let cid = match revocation.id {
297 borderless::prelude::Id::Contract { contract_id } => contract_id,
298 borderless::prelude::Id::Agent { .. } => return Err(ErrorKind::InvalidIdType.into()),
299 };
300 self.process_chain_tx(
301 cid,
302 input,
303 *writer,
304 tx_ctx,
305 Some(Commit::Revocation(revocation)),
306 )?;
307 Ok(())
308 }
309
310 fn process_chain_tx(
315 &mut self,
316 cid: ContractId,
317 input: Vec<u8>,
318 writer: BorderlessId,
319 tx_ctx: TxCtx,
320 commit: Option<Commit>,
321 ) -> Result<Option<Events>> {
322 let tx_ctx_bytes = tx_ctx.to_bytes()?;
323 let (instance, mut store) = self
324 .contract_store
325 .get_contract(&cid, &self.engine, &mut self.linker)?
326 .ok_or_else(|| ErrorKind::MissingContract { cid })?;
327
328 if self.contract_revoked(&cid)? {
329 return Err(ErrorKind::RevokedContract { cid }.into());
330 }
331
332 let mtx = self.mutability_lock.get_lock(&cid);
333 let _guard = mtx.lock();
334
335 let contract_method = match &commit {
336 Some(Commit::Action(_)) => "process_transaction",
337 Some(Commit::Introduction(_)) => "process_introduction",
338 Some(Commit::Revocation(_)) => "process_revocation",
339 Some(Commit::Other) => panic!("Commit::Other is reserved for actions"),
340 None => "process_transaction", };
342
343 store.data_mut().set_register(REGISTER_INPUT, input);
345 store.data_mut().set_register(REGISTER_TX_CTX, tx_ctx_bytes);
346 store
347 .data_mut()
348 .set_register(REGISTER_WRITER, writer.into_bytes().into());
349
350 store.data_mut().set_register(
352 REGISTER_BLOCK_CTX,
353 self.block_ctx.clone().unwrap_or_default(),
354 );
355 store
356 .data_mut()
357 .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
358
359 store
361 .data_mut()
362 .prepare_exec(ActiveEntity::contract_tx(cid, true, tx_ctx))?;
363 let commit = match instance
364 .get_typed_func::<(), ()>(&mut store, contract_method)
365 .and_then(|func| func.call(&mut store, ()))
366 {
367 Ok(()) => {
368 commit
370 }
371 Err(e) => {
372 warn!("{contract_method} failed with error: {e}");
373 None
375 }
376 };
377 let output = store.data().get_register(REGISTER_OUTPUT);
378 let _log = store.data_mut().finish_exec(commit);
379
380 match output {
382 Some(bytes) => Ok(Some(Events::from_bytes(&bytes)?)),
383 None => Ok(None),
384 }
385 }
386
387 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %writer), err))]
389 pub fn perform_dry_run(
390 &mut self,
391 cid: &ContractId,
392 action: &CallAction,
393 writer: &BorderlessId,
394 ) -> Result<()> {
395 let input = action.to_bytes()?;
396
397 let tx_ctx = TxCtx::dummy();
401 let block_ctx = BlockCtx::dummy();
402 self.set_block(block_ctx.block_id, block_ctx.timestamp)?;
403 let _out = self.process_chain_tx(*cid, input, *writer, tx_ctx, None)?;
404 Ok(())
405 }
406
407 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
410 pub fn http_get_state(&mut self, cid: &ContractId, path: String) -> Result<(u16, Vec<u8>)> {
411 let (status, result) = self.process_http_call(cid, path, None, None, "http_get_state")?;
412 Ok((status, result))
413 }
414
415 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(contract_id = %cid, %path), err))]
421 pub fn http_post_action(
422 &mut self,
423 cid: &ContractId,
424 path: String,
425 payload: Vec<u8>,
426 writer: &BorderlessId,
427 ) -> Result<std::result::Result<CallAction, (u16, String)>> {
428 if self.contract_revoked(cid)? {
430 return Ok(Err((
431 StatusCode::BAD_REQUEST.as_u16(),
432 ErrorKind::RevokedContract { cid: *cid }.to_string(),
433 )));
434 }
435 let (status, result) =
436 self.process_http_call(cid, path, Some(payload), Some(writer), "http_post_action")?;
437 if status == 200 {
438 let action =
439 CallAction::from_bytes(&result).map_err(|_| ErrorKind::InvalidRegisterValue {
440 register: "http-result",
441 expected_type: "CallAction",
442 })?;
443 Ok(Ok(action))
444 } else {
445 let error = String::from_utf8(result).map_err(|_| ErrorKind::InvalidRegisterValue {
446 register: "http-result",
447 expected_type: "string",
448 })?;
449 Ok(Err((status, error)))
450 }
451 }
452
453 fn process_http_call(
454 &mut self,
455 cid: &ContractId,
456 path: String,
457 payload: Option<Vec<u8>>,
458 writer: Option<&BorderlessId>,
459 http_method: &'static str,
460 ) -> Result<(u16, Vec<u8>)> {
461 let (instance, mut store) = self
462 .contract_store
463 .get_contract(cid, &self.engine, &mut self.linker)?
464 .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
465
466 store
468 .data_mut()
469 .prepare_exec(ActiveEntity::contract_http(*cid))?;
470
471 store
472 .data_mut()
473 .set_register(REGISTER_INPUT_HTTP_PATH, path.into_bytes());
474
475 if let Some(payload) = payload {
476 store
477 .data_mut()
478 .set_register(REGISTER_INPUT_HTTP_PAYLOAD, payload);
479 }
480
481 if let Some(writer) = writer {
482 store
483 .data_mut()
484 .set_register(REGISTER_WRITER, writer.into_bytes().into());
485 }
486
487 store.data_mut().set_register(
489 REGISTER_BLOCK_CTX,
490 self.block_ctx.clone().unwrap_or_default(),
491 );
492 store
493 .data_mut()
494 .set_register(REGISTER_EXECUTOR, self.executor.clone().unwrap_or_default());
495
496 if let Err(e) = instance
497 .get_typed_func::<(), ()>(&mut store, http_method)
498 .and_then(|func| func.call(&mut store, ()))
499 {
500 error!("{http_method} failed with error: {e}");
501 }
502 let status = store.data().get_register(REGISTER_OUTPUT_HTTP_STATUS);
504
505 let result = store.data().get_register(REGISTER_OUTPUT_HTTP_RESULT);
506
507 let _log = store.data_mut().finish_exec(None)?;
509
510 let status = status.ok_or_else(|| ErrorKind::MissingRegisterValue("http-status"))?;
512 let status_bytes = status
513 .try_into()
514 .map_err(|_| ErrorKind::InvalidRegisterValue {
515 register: "http-status",
516 expected_type: "u16",
517 })?;
518 let status = u16::from_be_bytes(status_bytes);
519
520 let result = result.ok_or_else(|| ErrorKind::MissingRegisterValue("http-result"))?;
521 Ok((status, result))
522 }
523
524 pub fn get_symbols(&mut self, cid: &ContractId) -> Result<Option<Symbols>> {
526 let (instance, mut store) = self
527 .contract_store
528 .get_contract(cid, &self.engine, &mut self.linker)?
529 .ok_or_else(|| ErrorKind::MissingContract { cid: *cid })?;
530
531 store.data_mut().prepare_exec(ActiveEntity::None)?;
532
533 if let Err(e) = instance
535 .get_typed_func::<(), ()>(&mut store, "get_symbols")
536 .and_then(|func| func.call(&mut store, ()))
537 {
538 error!("get_symbols failed with error: {e}");
539 }
540 let output = store.data().get_register(REGISTER_OUTPUT);
541 store.data_mut().finish_exec(None)?;
542
543 let bytes = match output {
544 Some(b) => b,
545 None => return Ok(None),
546 };
547 let symbols = Symbols::from_bytes(&bytes)?;
548 Ok(Some(symbols))
549 }
550
551 pub fn available_contracts(&self) -> Result<Vec<ContractId>> {
552 self.contract_store.available_contracts()
553 }
554
555 pub fn get_db(&self) -> S {
557 self.contract_store.get_db()
558 }
559}
560
561type Lock = Arc<Mutex<()>>;
562
563#[derive(Clone, Default)]
578pub struct MutLock {
579 map: Arc<Mutex<HashMap<ContractId, Lock>>>,
580}
581
582impl MutLock {
583 pub fn get_lock(&self, cid: &ContractId) -> Lock {
587 let mut map = self.map.lock();
588 let lock = map.entry(*cid).or_default();
589 lock.clone()
590 }
591}
592
593fn check_module(engine: &Engine, module: &Module) -> Result<()> {
594 let functions = [
595 "process_transaction",
596 "process_introduction",
597 "process_revocation",
598 "http_get_state",
599 "http_post_action",
600 "parse_state",
601 "get_symbols",
602 ];
603 for func in functions {
604 let exp = module
605 .get_export(func)
606 .ok_or_else(|| ErrorKind::MissingExport { func })?;
607 if let ExternType::Func(func_type) = exp {
608 if !func_type.matches(&FuncType::new(engine, [], [])) {
609 return Err(ErrorKind::InvalidFuncType { func }.into());
610 }
611 } else {
612 return Err(ErrorKind::InvalidExport { func }.into());
613 }
614 }
615 Ok(())
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 const ALL_EXPORTS: &str = r#"
623(module
624 ;; Declare the function `placeholder`
625 (func $placeholder)
626
627 ;; Export the functions so they can be called from outside the module
628 (export "process_transaction" (func $placeholder))
629 (export "process_introduction" (func $placeholder))
630 (export "process_revocation" (func $placeholder))
631 (export "http_get_state" (func $placeholder))
632 (export "http_post_action" (func $placeholder))
633 (export "parse_state" (func $placeholder))
634 (export "get_symbols" (func $placeholder))
635)
636"#;
637 fn remove_line_with_pattern(original: &str, pattern: &str) -> String {
638 let mut new_lines = Vec::new();
640
641 for line in original.lines() {
642 if !line.contains(pattern) {
644 new_lines.push(line);
646 }
647 }
648
649 new_lines.join("\n")
651 }
652
653 #[test]
654 fn missing_exports() {
655 let mut config = Config::new();
656 config.cranelift_opt_level(wasmtime::OptLevel::Speed);
657 config.async_support(false);
658 let engine = Engine::new(&config).unwrap();
659
660 let functions = [
662 "process_transaction",
663 "process_introduction",
664 "process_revocation",
665 "http_get_state",
666 "http_post_action",
667 "parse_state",
668 "get_symbols",
669 ];
670 for func in functions {
671 let wat_missing = remove_line_with_pattern(ALL_EXPORTS, func);
672 let module = Module::new(&engine, &wat_missing);
673 assert!(module.is_ok());
674 let err = check_module(&engine, &module.unwrap());
675 assert!(err.is_err());
676 }
677 let module = Module::new(&engine, &ALL_EXPORTS);
678 assert!(module.is_ok());
679
680 let err = check_module(&engine, &module.unwrap());
681 assert!(err.is_ok());
682 }
683}