1use kv::KvHost;
2use ledgers::LedgerHost;
3use logging::LoggerHost;
4use router::Router;
5use sign::SignerHost;
6use std::{collections::HashMap, io::Read, path::Path, sync::Arc};
7use thiserror::Error;
8use tokio::sync::Mutex;
9use tracing::{debug, info, warn};
10use utxorpc::spec::sync::BlockRef;
11use wasmtime::component::HasSelf;
12
13pub mod wit {
14 wasmtime::component::bindgen!({
15 path: "./wit",
16 async: true,
17 tracing: true,
18 });
19}
20
21mod metrics;
22mod router;
23mod store;
24
25pub mod drivers;
27pub mod http;
28pub mod kv;
29pub mod ledgers;
30pub mod logging;
31pub mod sign;
32pub mod submit;
33
34pub use store::Store;
35pub use wit::Response;
36
37pub type WorkerId = String;
38
39#[derive(Error, Debug)]
40pub enum Error {
41 #[error("wasm error {0}")]
42 Wasm(wasmtime::Error),
43
44 #[error("store error {0}")]
45 Store(Box<redb::Error>),
46
47 #[error("worker not found '{0}'")]
48 WorkerNotFound(WorkerId),
49
50 #[error("worker failed to handle event (code: '{0}', message: '{1}')")]
51 Handle(u32, String),
52
53 #[error("no target available to solve request")]
54 NoTarget,
55
56 #[error("more than one target available to solve request")]
57 AmbiguousTarget,
58
59 #[error("address in block failed to parse")]
60 BadAddress(pallas::ledger::addresses::Error),
61
62 #[error("ledger error: {0}")]
63 Ledger(String),
64
65 #[error("config error: {0}")]
66 Config(String),
67
68 #[error("driver error: {0}")]
69 Driver(String),
70
71 #[error("failed to interact with WASM object: {0}")]
72 ObjectStoreError(object_store::Error),
73
74 #[error("failed to interact with local WASM file: {0}")]
75 IoError(std::io::Error),
76
77 #[error("kv error {0}")]
78 KvError(String),
79}
80
81impl From<wasmtime::Error> for Error {
82 fn from(value: wasmtime::Error) -> Self {
83 Self::Wasm(value)
84 }
85}
86
87impl From<redb::Error> for Error {
88 fn from(value: redb::Error) -> Self {
89 Self::Store(Box::new(value))
90 }
91}
92
93impl From<redb::DatabaseError> for Error {
94 fn from(value: redb::DatabaseError) -> Self {
95 Self::Store(Box::new(value.into()))
96 }
97}
98
99impl From<redb::TransactionError> for Error {
100 fn from(value: redb::TransactionError) -> Self {
101 Self::Store(Box::new(value.into()))
102 }
103}
104
105impl From<redb::TableError> for Error {
106 fn from(value: redb::TableError) -> Self {
107 Self::Store(Box::new(value.into()))
108 }
109}
110
111impl From<redb::CommitError> for Error {
112 fn from(value: redb::CommitError) -> Self {
113 Self::Store(Box::new(value.into()))
114 }
115}
116
117impl From<redb::StorageError> for Error {
118 fn from(value: redb::StorageError) -> Self {
119 Self::Store(Box::new(value.into()))
120 }
121}
122
123impl From<pallas::ledger::addresses::Error> for Error {
124 fn from(value: pallas::ledger::addresses::Error) -> Self {
125 Self::BadAddress(value)
126 }
127}
128
129impl From<std::io::Error> for Error {
130 fn from(value: std::io::Error) -> Self {
131 Self::IoError(value)
132 }
133}
134
135impl From<object_store::Error> for Error {
136 fn from(value: object_store::Error) -> Self {
137 match value {
138 object_store::Error::Generic { store, source } => {
139 Self::Config(format!("Failed to parse url: {store}, {source}"))
140 }
141 object_store::Error::NotFound { path: _, source } => {
142 Self::WorkerNotFound(source.to_string())
143 }
144 x => Self::ObjectStoreError(x),
145 }
146 }
147}
148
149pub type BlockSlot = u64;
150pub type BlockHash = pallas::crypto::hash::Hash<32>;
151
152pub enum ChainPoint {
153 Cardano(utxorpc::spec::sync::BlockRef),
154}
155
156pub type LogSeq = u64;
157
158pub enum TxInput {
159 Cardano(utxorpc::spec::cardano::TxInput),
160}
161
162impl TxInput {
163 pub fn to_bytes(&self) -> Vec<u8> {
164 use prost::Message;
165
166 match self {
167 Self::Cardano(tx_input) => tx_input.encode_to_vec(),
168 }
169 }
170
171 pub fn address(&self) -> Option<Vec<u8>> {
172 match self {
173 Self::Cardano(tx_input) => tx_input.as_output.as_ref().map(|o| o.address.to_vec()),
174 }
175 }
176}
177
178pub enum Utxo {
179 Cardano(utxorpc::spec::cardano::TxOutput),
180}
181
182impl Utxo {
183 pub fn to_bytes(&self) -> Vec<u8> {
184 use prost::Message;
185
186 match self {
187 Self::Cardano(utxo) => utxo.encode_to_vec(),
188 }
189 }
190
191 pub fn address(&self) -> Option<Vec<u8>> {
192 match self {
193 Self::Cardano(utxo) => Some(utxo.address.to_vec()),
194 }
195 }
196}
197
198pub enum Tx {
199 Cardano(utxorpc::spec::cardano::Tx),
200}
201
202impl Tx {
203 pub fn hash(&self) -> Vec<u8> {
204 match self {
205 Self::Cardano(tx) => tx.hash.clone().into(),
206 }
207 }
208 pub fn inputs(&self) -> Vec<TxInput> {
209 match self {
210 Self::Cardano(tx) => tx
211 .inputs
212 .iter()
213 .map(|i| TxInput::Cardano(i.clone()))
214 .collect(),
215 }
216 }
217 pub fn outputs(&self) -> Vec<Utxo> {
218 match self {
219 Self::Cardano(tx) => tx
220 .outputs
221 .iter()
222 .map(|o| Utxo::Cardano(o.clone()))
223 .collect(),
224 }
225 }
226 pub fn to_bytes(&self) -> Vec<u8> {
227 use prost::Message;
228
229 match self {
230 Self::Cardano(tx) => tx.encode_to_vec(),
231 }
232 }
233}
234
235#[derive(Debug, Clone)]
236pub enum Block {
237 Cardano(utxorpc::spec::cardano::Block),
238}
239
240impl Block {
241 pub fn hash(&self) -> Vec<u8> {
242 match self {
243 Self::Cardano(block) => block.header.as_ref().unwrap().hash.clone().into(),
244 }
245 }
246 pub fn height(&self) -> u64 {
247 match self {
248 Self::Cardano(block) => block.header.as_ref().unwrap().height,
249 }
250 }
251 pub fn slot(&self) -> u64 {
252 match self {
253 Self::Cardano(block) => block.header.as_ref().unwrap().slot,
254 }
255 }
256 pub fn txs(&self) -> Vec<Tx> {
257 match self {
258 Self::Cardano(block) => block
259 .body
260 .iter()
261 .flat_map(|b| b.tx.iter())
262 .map(|t| Tx::Cardano(t.clone()))
263 .collect(),
264 }
265 }
266
267 pub fn chain_point(&self) -> ChainPoint {
268 match self {
269 Self::Cardano(block) => ChainPoint::Cardano(BlockRef {
270 index: block.header.as_ref().unwrap().slot,
271 hash: block.header.as_ref().unwrap().hash.clone(),
272 }),
273 }
274 }
275
276 pub fn to_bytes(&self) -> Vec<u8> {
277 use prost::Message;
278
279 match self {
280 Self::Cardano(block) => block.encode_to_vec(),
281 }
282 }
283
284 pub fn from_bytes(data: &[u8]) -> Self {
285 use prost::Message;
286
287 Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap())
288 }
289}
290
291struct WorkerState {
292 pub worker_id: String,
293 pub router: router::Router,
294 pub ledger: Option<ledgers::LedgerHost>,
295 pub logging: Option<logging::LoggerHost>,
296 pub kv: Option<kv::KvHost>,
297 pub sign: Option<sign::SignerHost>,
298 pub submit: Option<submit::Submit>,
299 pub http: Option<http::Http>,
300}
301
302impl wit::balius::app::driver::Host for WorkerState {
303 async fn register_channel(
304 &mut self,
305 id: u32,
306 pattern: wit::balius::app::driver::EventPattern,
307 ) -> () {
308 self.router.register_channel(id, &pattern);
309 }
310
311 async fn register_signer(&mut self, name: String, algorithm: String) -> Vec<u8> {
312 let signer = self.sign.as_mut().expect("No sign interface defined.");
313 signer.add_key(name, algorithm).await
314 }
315}
316
317struct LoadedWorker {
318 wasm_store: wasmtime::Store<WorkerState>,
319 instance: wit::Worker,
320 cursor: Option<LogSeq>,
321 metrics: Arc<metrics::Metrics>,
322}
323
324impl LoadedWorker {
325 pub async fn dispatch_event(
326 &mut self,
327 channel: u32,
328 event: &wit::Event,
329 ) -> Result<wit::Response, Error> {
330 self.instance
331 .call_handle(&mut self.wasm_store, channel, event)
332 .await?
333 .map_err(|err| Error::Handle(err.code, err.message))
334 }
335
336 async fn acknowledge_event(&mut self, channel: u32, event: &wit::Event) -> Result<(), Error> {
337 let result = self.dispatch_event(channel, event).await;
338
339 match result {
340 Ok(wit::Response::Acknowledge) => {
341 tracing::debug!("worker acknowledge");
342 }
343 Ok(_) => {
344 tracing::warn!("worker returned unexpected data");
345 }
346 Err(Error::Handle(code, message)) => {
347 tracing::warn!(code, message);
348 }
349 Err(e) => return Err(e),
350 }
351
352 Ok(())
353 }
354
355 async fn apply_block(&mut self, block: &Block) -> Result<(), Error> {
356 let worker_id = self.wasm_store.data().worker_id.clone();
357 let block_hash = block.hash();
358 let block_height = block.height();
359 let block_slot = block.slot();
360 for tx in block.txs() {
361 let tx_hash = tx.hash();
362 let channels = self.wasm_store.data().router.find_tx_targets(&tx);
363 if !channels.is_empty() {
364 let event = wit::Event::Tx(wit::balius::app::driver::Tx {
365 block: wit::balius::app::driver::BlockRef {
366 block_hash: block_hash.clone(),
367 block_height,
368 block_slot,
369 },
370 body: tx.to_bytes(),
371 hash: tx_hash.clone(),
372 });
373 for channel in channels {
374 self.metrics.tx_handled(&worker_id);
375 self.acknowledge_event(channel, &event).await?;
376 }
377 }
378
379 for (index, utxo) in tx.outputs().into_iter().enumerate() {
380 let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
381 if channels.is_empty() {
382 continue;
383 }
384
385 let event = wit::Event::Utxo(wit::balius::app::driver::Utxo {
386 block: wit::balius::app::driver::BlockRef {
387 block_hash: block_hash.clone(),
388 block_height,
389 block_slot,
390 },
391 body: utxo.to_bytes(),
392 ref_: wit::balius::app::driver::TxoRef {
393 tx_hash: tx_hash.clone(),
394 txo_index: index as u32,
395 },
396 });
397
398 for channel in channels {
399 self.metrics.utxo_handled(&worker_id);
400 self.acknowledge_event(channel, &event).await?;
401 }
402 }
403 }
404
405 Ok(())
406 }
407
408 async fn undo_block(&mut self, block: &Block) -> Result<(), Error> {
409 let worker_id = self.wasm_store.data().worker_id.clone();
410 let block_hash = block.hash();
411 let block_height = block.height();
412 let block_slot = block.slot();
413 for tx in block.txs() {
414 let tx_hash = tx.hash();
415 for (index, utxo) in tx.outputs().into_iter().enumerate().rev() {
416 let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
417 if channels.is_empty() {
418 continue;
419 }
420
421 let event = wit::Event::UtxoUndo(wit::balius::app::driver::Utxo {
422 block: wit::balius::app::driver::BlockRef {
423 block_hash: block_hash.clone(),
424 block_height,
425 block_slot,
426 },
427 body: utxo.to_bytes(),
428 ref_: wit::balius::app::driver::TxoRef {
429 tx_hash: tx_hash.clone(),
430 txo_index: index as u32,
431 },
432 });
433
434 for channel in channels {
435 self.metrics.undo_utxo_handled(&worker_id);
436 self.acknowledge_event(channel, &event).await?;
437 }
438 }
439
440 let channels = self.wasm_store.data().router.find_tx_targets(&tx);
441 if !channels.is_empty() {
442 let event = wit::Event::TxUndo(wit::balius::app::driver::Tx {
443 block: wit::balius::app::driver::BlockRef {
444 block_hash: block_hash.clone(),
445 block_height,
446 block_slot,
447 },
448 body: tx.to_bytes(),
449 hash: tx_hash.clone(),
450 });
451 for channel in channels {
452 self.metrics.undo_tx_handled(&worker_id);
453 self.acknowledge_event(channel, &event).await?;
454 }
455 }
456 }
457
458 Ok(())
459 }
460
461 async fn apply_chain(
462 &mut self,
463 undo_blocks: &Vec<Block>,
464 next_block: &Block,
465 ) -> Result<(), Error> {
466 for block in undo_blocks {
467 self.undo_block(block).await?;
468 }
469
470 self.apply_block(next_block).await
471 }
472}
473
474type WorkerMap = HashMap<String, LoadedWorker>;
475
476#[derive(Clone)]
477pub struct Runtime {
478 engine: wasmtime::Engine,
479 linker: wasmtime::component::Linker<WorkerState>,
480 loaded: Arc<Mutex<WorkerMap>>,
481
482 store: store::Store,
483 ledger: Option<ledgers::Ledger>,
484 logging: Option<logging::Logger>,
485 kv: Option<kv::Kv>,
486 sign: Option<sign::Signer>,
487 submit: Option<submit::Submit>,
488 http: Option<http::Http>,
489
490 metrics: Arc<metrics::Metrics>,
491}
492
493impl Runtime {
494 pub fn builder(store: store::Store) -> RuntimeBuilder {
495 RuntimeBuilder::new(store)
496 }
497
498 pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
499 let lowest_seq = self
500 .loaded
501 .lock()
502 .await
503 .values()
504 .flat_map(|w| w.cursor)
505 .min();
506
507 if let Some(seq) = lowest_seq {
508 debug!(lowest_seq, "found lowest seq");
509 return self.store.find_chain_point(seq);
510 }
511
512 Ok(None)
513 }
514
515 pub async fn register_worker(
516 &self,
517 id: &str,
518 wasm: &[u8],
519 config: serde_json::Value,
520 ) -> Result<(), Error> {
521 let component = wasmtime::component::Component::new(&self.engine, wasm)?;
522
523 let mut wasm_store = wasmtime::Store::new(
524 &self.engine,
525 WorkerState {
526 worker_id: id.to_owned(),
527 router: Router::new(),
528 ledger: self
529 .ledger
530 .as_ref()
531 .map(|l| LedgerHost::new(id, l, &self.metrics)),
532 logging: self
533 .logging
534 .as_ref()
535 .map(|kv| LoggerHost::new(id, kv, &self.metrics)),
536 kv: self
537 .kv
538 .as_ref()
539 .map(|kv| KvHost::new(id, kv, &self.metrics)),
540 sign: self
541 .sign
542 .as_ref()
543 .map(|s| SignerHost::new(id, s, &self.metrics)),
544 submit: self.submit.clone(),
545 http: self.http.clone(),
546 },
547 );
548
549 let instance =
550 wit::Worker::instantiate_async(&mut wasm_store, &component, &self.linker).await?;
551
552 let config = serde_json::to_vec(&config).unwrap();
553 instance.call_init(&mut wasm_store, &config).await?;
554
555 let cursor = self.store.get_worker_cursor(id)?;
556 debug!(cursor, id, "found cursor for worker");
557
558 self.loaded.lock().await.insert(
559 id.to_owned(),
560 LoadedWorker {
561 wasm_store,
562 instance,
563 cursor,
564 metrics: self.metrics.clone(),
565 },
566 );
567
568 Ok(())
569 }
570
571 pub async fn register_worker_from_url(
576 &self,
577 id: &str,
578 url: &url::Url,
579 config: serde_json::Value,
580 ) -> Result<(), Error> {
581 let (store, path) = object_store::parse_url(url)?;
582 let bytes = store.get(&path).await?.bytes().await?;
583 self.register_worker(id, &bytes, config).await
584 }
585
586 pub async fn register_worker_from_file(
587 &self,
588 id: &str,
589 wasm_path: impl AsRef<Path>,
590 config: serde_json::Value,
591 ) -> Result<(), Error> {
592 let mut file = std::fs::File::open(wasm_path)?;
593 let mut buffer = Vec::new();
594 file.read_to_end(&mut buffer)?;
595 self.register_worker(id, &buffer, config).await
596 }
597
598 pub async fn remove_worker(&self, id: &str) -> Result<(), Error> {
599 match self.loaded.lock().await.remove(id) {
600 Some(_) => {
601 info!(worker = id, "Successfully removed worker from runtime.")
602 }
603 None => {
604 warn!(worker = id, "Worker not found, skipping remove.")
605 }
606 }
607
608 Ok(())
609 }
610
611 pub async fn handle_chain(
612 &mut self,
613 undo_blocks: &Vec<Block>,
614 next_block: &Block,
615 ) -> Result<(), Error> {
616 info!("applying block");
617
618 let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
619
620 let mut workers = self.loaded.lock().await;
621
622 let mut store_update = self.store.start_atomic_update(log_seq)?;
623
624 for (_, worker) in workers.iter_mut() {
625 worker.apply_chain(undo_blocks, next_block).await?;
626 store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
627 }
628
629 store_update.commit()?;
630
631 Ok(())
632 }
633
634 pub async fn handle_request(
635 &self,
636 worker_id: &str,
637 method: &str,
638 params: Vec<u8>,
639 ) -> Result<wit::Response, Error> {
640 let mut lock = self.loaded.lock().await;
641
642 let worker = lock
643 .get_mut(worker_id)
644 .ok_or(Error::WorkerNotFound(worker_id.to_string()))?;
645
646 let channel = worker
647 .wasm_store
648 .data()
649 .router
650 .find_request_target(method)?;
651
652 let evt = wit::Event::Request(params);
653
654 let result = worker.dispatch_event(channel, &evt).await;
655 self.metrics.request(worker_id, method, result.is_ok());
656 result
657 }
658}
659
660struct HasField<T>(std::marker::PhantomData<T>);
661impl<T: 'static> wasmtime::component::HasData for HasField<T> {
662 type Data<'a> = &'a mut T;
663}
664
665pub struct RuntimeBuilder {
666 store: store::Store,
667 engine: wasmtime::Engine,
668 linker: wasmtime::component::Linker<WorkerState>,
669 ledger: Option<ledgers::Ledger>,
670 logging: Option<logging::Logger>,
671 kv: Option<kv::Kv>,
672 sign: Option<sign::Signer>,
673 submit: Option<submit::Submit>,
674 http: Option<http::Http>,
675}
676
677impl RuntimeBuilder {
678 pub fn new(store: store::Store) -> Self {
679 let mut config = wasmtime::Config::new();
680 config.async_support(true);
681 let engine = wasmtime::Engine::new(&config).unwrap();
682 let mut linker = wasmtime::component::Linker::new(&engine);
683
684 wit::balius::app::driver::add_to_linker::<_, HasSelf<_>>(
685 &mut linker,
686 |state: &mut WorkerState| state,
687 )
688 .unwrap();
689
690 Self {
691 store,
692 engine,
693 linker,
694 ledger: None,
695 logging: None,
696 kv: None,
697 sign: None,
698 submit: None,
699 http: None,
700 }
701 }
702
703 pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
704 self.ledger = Some(ledger);
705
706 wit::balius::app::ledger::add_to_linker::<_, HasField<_>>(
707 &mut self.linker,
708 |state: &mut WorkerState| state.ledger.as_mut().unwrap(),
709 )
710 .unwrap();
711
712 self
713 }
714
715 pub fn with_kv(mut self, kv: kv::Kv) -> Self {
716 wit::balius::app::kv::add_to_linker::<_, HasField<_>>(
717 &mut self.linker,
718 |state: &mut WorkerState| state.kv.as_mut().unwrap(),
719 )
720 .unwrap();
721 self.kv = Some(kv);
722
723 self
724 }
725
726 pub fn with_logger(mut self, logging: logging::Logger) -> Self {
727 self.logging = Some(logging);
728
729 wit::balius::app::logging::add_to_linker::<_, HasField<_>>(
730 &mut self.linker,
731 |state: &mut WorkerState| state.logging.as_mut().unwrap(),
732 )
733 .unwrap();
734
735 self
736 }
737
738 pub fn with_signer(mut self, sign: sign::Signer) -> Self {
739 self.sign = Some(sign);
740
741 wit::balius::app::sign::add_to_linker::<_, HasField<_>>(
742 &mut self.linker,
743 |state: &mut WorkerState| state.sign.as_mut().unwrap(),
744 )
745 .unwrap();
746
747 self
748 }
749
750 pub fn with_submit(mut self, submit: submit::Submit) -> Self {
751 self.submit = Some(submit);
752
753 wit::balius::app::submit::add_to_linker::<_, HasField<_>>(
754 &mut self.linker,
755 |state: &mut WorkerState| state.submit.as_mut().unwrap(),
756 )
757 .unwrap();
758
759 self
760 }
761
762 pub fn with_http(mut self, http: http::Http) -> Self {
763 self.http = Some(http);
764 wit::balius::app::http::add_to_linker::<_, HasField<_>>(
765 &mut self.linker,
766 |state: &mut WorkerState| state.http.as_mut().unwrap(),
767 )
768 .unwrap();
769
770 self
771 }
772
773 pub fn build(self) -> Result<Runtime, Error> {
774 let mut this = self;
775 if this.logging.is_none() {
776 this = this.with_logger(logging::Logger::Silent);
777 }
778
779 let RuntimeBuilder {
780 store,
781 engine,
782 linker,
783 ledger,
784 logging,
785 kv,
786 sign,
787 submit,
788 http,
789 } = this;
790
791 Ok(Runtime {
792 metrics: Default::default(),
793 loaded: Default::default(),
794 engine,
795 linker,
796 store,
797 ledger,
798 logging,
799 kv,
800 sign,
801 submit,
802 http,
803 })
804 }
805}