1use kv::KvHost;
2use logging::LoggerHost;
3use router::Router;
4use sign::SignerHost;
5use std::{collections::HashMap, io::Read, path::Path, sync::Arc};
6use thiserror::Error;
7use tokio::sync::Mutex;
8use tracing::{debug, info, warn};
9use utxorpc::spec::sync::BlockRef;
10
11pub mod wit {
12 wasmtime::component::bindgen!({
13 path: "./wit",
14 async: true,
15 tracing: true,
16 });
17}
18
19mod metrics;
20mod router;
21mod store;
22
23pub mod drivers;
25pub mod http;
26pub mod kv;
27pub mod ledgers;
28pub mod logging;
29pub mod sign;
30pub mod submit;
31
32pub use store::Store;
33pub use wit::Response;
34
35pub type WorkerId = String;
36
37#[derive(Error, Debug)]
38pub enum Error {
39 #[error("wasm error {0}")]
40 Wasm(wasmtime::Error),
41
42 #[error("store error {0}")]
43 Store(Box<redb::Error>),
44
45 #[error("worker not found '{0}'")]
46 WorkerNotFound(WorkerId),
47
48 #[error("worker failed to handle event (code: '{0}', message: '{1}')")]
49 Handle(u32, String),
50
51 #[error("no target available to solve request")]
52 NoTarget,
53
54 #[error("more than one target available to solve request")]
55 AmbiguousTarget,
56
57 #[error("address in block failed to parse")]
58 BadAddress(pallas::ledger::addresses::Error),
59
60 #[error("ledger error: {0}")]
61 Ledger(String),
62
63 #[error("config error: {0}")]
64 Config(String),
65
66 #[error("driver error: {0}")]
67 Driver(String),
68
69 #[error("failed to interact with WASM object: {0}")]
70 ObjectStoreError(object_store::Error),
71
72 #[error("failed to interact with local WASM file: {0}")]
73 IoError(std::io::Error),
74}
75
76impl From<wasmtime::Error> for Error {
77 fn from(value: wasmtime::Error) -> Self {
78 Self::Wasm(value)
79 }
80}
81
82impl From<redb::Error> for Error {
83 fn from(value: redb::Error) -> Self {
84 Self::Store(Box::new(value))
85 }
86}
87
88impl From<redb::DatabaseError> for Error {
89 fn from(value: redb::DatabaseError) -> Self {
90 Self::Store(Box::new(value.into()))
91 }
92}
93
94impl From<redb::TransactionError> for Error {
95 fn from(value: redb::TransactionError) -> Self {
96 Self::Store(Box::new(value.into()))
97 }
98}
99
100impl From<redb::TableError> for Error {
101 fn from(value: redb::TableError) -> Self {
102 Self::Store(Box::new(value.into()))
103 }
104}
105
106impl From<redb::CommitError> for Error {
107 fn from(value: redb::CommitError) -> Self {
108 Self::Store(Box::new(value.into()))
109 }
110}
111
112impl From<redb::StorageError> for Error {
113 fn from(value: redb::StorageError) -> Self {
114 Self::Store(Box::new(value.into()))
115 }
116}
117
118impl From<pallas::ledger::addresses::Error> for Error {
119 fn from(value: pallas::ledger::addresses::Error) -> Self {
120 Self::BadAddress(value)
121 }
122}
123
124impl From<std::io::Error> for Error {
125 fn from(value: std::io::Error) -> Self {
126 Self::IoError(value)
127 }
128}
129
130impl From<object_store::Error> for Error {
131 fn from(value: object_store::Error) -> Self {
132 match value {
133 object_store::Error::Generic { store, source } => {
134 Self::Config(format!("Failed to parse url: {}, {}", store, source))
135 }
136 object_store::Error::NotFound { path: _, source } => {
137 Self::WorkerNotFound(source.to_string())
138 }
139 x => Self::ObjectStoreError(x),
140 }
141 }
142}
143
144pub type BlockSlot = u64;
145pub type BlockHash = pallas::crypto::hash::Hash<32>;
146
147pub enum ChainPoint {
148 Cardano(utxorpc::spec::sync::BlockRef),
149}
150
151pub type LogSeq = u64;
152
153pub enum TxInput {
154 Cardano(utxorpc::spec::cardano::TxInput),
155}
156
157impl TxInput {
158 pub fn to_bytes(&self) -> Vec<u8> {
159 use prost::Message;
160
161 match self {
162 Self::Cardano(tx_input) => tx_input.encode_to_vec(),
163 }
164 }
165
166 pub fn address(&self) -> Option<Vec<u8>> {
167 match self {
168 Self::Cardano(tx_input) => tx_input.as_output.as_ref().map(|o| o.address.to_vec()),
169 }
170 }
171}
172
173pub enum Utxo {
174 Cardano(utxorpc::spec::cardano::TxOutput),
175}
176
177impl Utxo {
178 pub fn to_bytes(&self) -> Vec<u8> {
179 use prost::Message;
180
181 match self {
182 Self::Cardano(utxo) => utxo.encode_to_vec(),
183 }
184 }
185
186 pub fn address(&self) -> Option<Vec<u8>> {
187 match self {
188 Self::Cardano(utxo) => Some(utxo.address.to_vec()),
189 }
190 }
191}
192
193pub enum Tx {
194 Cardano(utxorpc::spec::cardano::Tx),
195}
196
197impl Tx {
198 pub fn hash(&self) -> Vec<u8> {
199 match self {
200 Self::Cardano(tx) => tx.hash.clone().into(),
201 }
202 }
203 pub fn inputs(&self) -> Vec<TxInput> {
204 match self {
205 Self::Cardano(tx) => tx
206 .inputs
207 .iter()
208 .map(|i| TxInput::Cardano(i.clone()))
209 .collect(),
210 }
211 }
212 pub fn outputs(&self) -> Vec<Utxo> {
213 match self {
214 Self::Cardano(tx) => tx
215 .outputs
216 .iter()
217 .map(|o| Utxo::Cardano(o.clone()))
218 .collect(),
219 }
220 }
221 pub fn to_bytes(&self) -> Vec<u8> {
222 use prost::Message;
223
224 match self {
225 Self::Cardano(tx) => tx.encode_to_vec(),
226 }
227 }
228}
229
230#[derive(Debug, Clone)]
231pub enum Block {
232 Cardano(utxorpc::spec::cardano::Block),
233}
234
235impl Block {
236 pub fn hash(&self) -> Vec<u8> {
237 match self {
238 Self::Cardano(block) => block.header.as_ref().unwrap().hash.clone().into(),
239 }
240 }
241 pub fn height(&self) -> u64 {
242 match self {
243 Self::Cardano(block) => block.header.as_ref().unwrap().height,
244 }
245 }
246 pub fn slot(&self) -> u64 {
247 match self {
248 Self::Cardano(block) => block.header.as_ref().unwrap().slot,
249 }
250 }
251 pub fn txs(&self) -> Vec<Tx> {
252 match self {
253 Self::Cardano(block) => block
254 .body
255 .iter()
256 .flat_map(|b| b.tx.iter())
257 .map(|t| Tx::Cardano(t.clone()))
258 .collect(),
259 }
260 }
261
262 pub fn chain_point(&self) -> ChainPoint {
263 match self {
264 Self::Cardano(block) => ChainPoint::Cardano(BlockRef {
265 index: block.header.as_ref().unwrap().slot,
266 hash: block.header.as_ref().unwrap().hash.clone(),
267 }),
268 }
269 }
270
271 pub fn to_bytes(&self) -> Vec<u8> {
272 use prost::Message;
273
274 match self {
275 Self::Cardano(block) => block.encode_to_vec(),
276 }
277 }
278
279 pub fn from_bytes(data: &[u8]) -> Self {
280 use prost::Message;
281
282 Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap())
283 }
284}
285
286struct WorkerState {
287 pub worker_id: String,
288 pub router: router::Router,
289 pub ledger: Option<ledgers::Ledger>,
290 pub logging: Option<logging::LoggerHost>,
291 pub kv: Option<kv::KvHost>,
292 pub sign: Option<sign::SignerHost>,
293 pub submit: Option<submit::Submit>,
294 pub http: Option<http::Http>,
295}
296
297#[async_trait::async_trait]
298impl wit::balius::app::driver::Host for WorkerState {
299 async fn register_channel(
300 &mut self,
301 id: u32,
302 pattern: wit::balius::app::driver::EventPattern,
303 ) -> () {
304 self.router.register_channel(id, &pattern);
305 }
306
307 async fn register_signer(&mut self, name: String, algorithm: String) -> Vec<u8> {
308 let signer = self.sign.as_mut().expect("No sign interface defined.");
309 signer.add_key(name, algorithm).await
310 }
311}
312
313struct LoadedWorker {
314 wasm_store: wasmtime::Store<WorkerState>,
315 instance: wit::Worker,
316 cursor: Option<LogSeq>,
317 metrics: Arc<metrics::Metrics>,
318}
319
320impl LoadedWorker {
321 pub async fn dispatch_event(
322 &mut self,
323 channel: u32,
324 event: &wit::Event,
325 ) -> Result<wit::Response, Error> {
326 self.instance
327 .call_handle(&mut self.wasm_store, channel, event)
328 .await?
329 .map_err(|err| Error::Handle(err.code, err.message))
330 }
331
332 async fn acknowledge_event(&mut self, channel: u32, event: &wit::Event) -> Result<(), Error> {
333 let result = self.dispatch_event(channel, event).await;
334
335 match result {
336 Ok(wit::Response::Acknowledge) => {
337 tracing::debug!("worker acknowledge");
338 }
339 Ok(_) => {
340 tracing::warn!("worker returned unexpected data");
341 }
342 Err(Error::Handle(code, message)) => {
343 tracing::warn!(code, message);
344 }
345 Err(e) => return Err(e),
346 }
347
348 Ok(())
349 }
350
351 async fn apply_block(&mut self, block: &Block) -> Result<(), Error> {
352 let worker_id = self.wasm_store.data().worker_id.clone();
353 let block_hash = block.hash();
354 let block_height = block.height();
355 let block_slot = block.slot();
356 for tx in block.txs() {
357 let tx_hash = tx.hash();
358 let channels = self.wasm_store.data().router.find_tx_targets(&tx);
359 if !channels.is_empty() {
360 let event = wit::Event::Tx(wit::balius::app::driver::Tx {
361 block: wit::balius::app::driver::BlockRef {
362 block_hash: block_hash.clone(),
363 block_height,
364 block_slot,
365 },
366 body: tx.to_bytes(),
367 hash: tx_hash.clone(),
368 });
369 for channel in channels {
370 self.metrics.tx_handled(&worker_id);
371 self.acknowledge_event(channel, &event).await?;
372 }
373 }
374
375 for (index, utxo) in tx.outputs().into_iter().enumerate() {
376 let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
377 if channels.is_empty() {
378 continue;
379 }
380
381 let event = wit::Event::Utxo(wit::balius::app::driver::Utxo {
382 block: wit::balius::app::driver::BlockRef {
383 block_hash: block_hash.clone(),
384 block_height,
385 block_slot,
386 },
387 body: utxo.to_bytes(),
388 ref_: wit::balius::app::driver::TxoRef {
389 tx_hash: tx_hash.clone(),
390 txo_index: index as u32,
391 },
392 });
393
394 for channel in channels {
395 self.metrics.utxo_handled(&worker_id);
396 self.acknowledge_event(channel, &event).await?;
397 }
398 }
399 }
400
401 Ok(())
402 }
403
404 async fn undo_block(&mut self, block: &Block) -> Result<(), Error> {
405 let worker_id = self.wasm_store.data().worker_id.clone();
406 let block_hash = block.hash();
407 let block_height = block.height();
408 let block_slot = block.slot();
409 for tx in block.txs() {
410 let tx_hash = tx.hash();
411 for (index, utxo) in tx.outputs().into_iter().enumerate().rev() {
412 let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
413 if channels.is_empty() {
414 continue;
415 }
416
417 let event = wit::Event::UtxoUndo(wit::balius::app::driver::Utxo {
418 block: wit::balius::app::driver::BlockRef {
419 block_hash: block_hash.clone(),
420 block_height,
421 block_slot,
422 },
423 body: utxo.to_bytes(),
424 ref_: wit::balius::app::driver::TxoRef {
425 tx_hash: tx_hash.clone(),
426 txo_index: index as u32,
427 },
428 });
429
430 for channel in channels {
431 self.metrics.undo_utxo_handled(&worker_id);
432 self.acknowledge_event(channel, &event).await?;
433 }
434 }
435
436 let channels = self.wasm_store.data().router.find_tx_targets(&tx);
437 if !channels.is_empty() {
438 let event = wit::Event::TxUndo(wit::balius::app::driver::Tx {
439 block: wit::balius::app::driver::BlockRef {
440 block_hash: block_hash.clone(),
441 block_height,
442 block_slot,
443 },
444 body: tx.to_bytes(),
445 hash: tx_hash.clone(),
446 });
447 for channel in channels {
448 self.metrics.undo_tx_handled(&worker_id);
449 self.acknowledge_event(channel, &event).await?;
450 }
451 }
452 }
453
454 Ok(())
455 }
456
457 async fn apply_chain(
458 &mut self,
459 undo_blocks: &Vec<Block>,
460 next_block: &Block,
461 ) -> Result<(), Error> {
462 for block in undo_blocks {
463 self.undo_block(block).await?;
464 }
465
466 self.apply_block(next_block).await
467 }
468}
469
470type WorkerMap = HashMap<String, LoadedWorker>;
471
472#[derive(Clone)]
473pub struct Runtime {
474 engine: wasmtime::Engine,
475 linker: wasmtime::component::Linker<WorkerState>,
476 loaded: Arc<Mutex<WorkerMap>>,
477
478 store: store::Store,
479 ledger: Option<ledgers::Ledger>,
480 logging: Option<logging::Logger>,
481 kv: Option<kv::Kv>,
482 sign: Option<sign::Signer>,
483 submit: Option<submit::Submit>,
484 http: Option<http::Http>,
485
486 metrics: Arc<metrics::Metrics>,
487}
488
489impl Runtime {
490 pub fn builder(store: store::Store) -> RuntimeBuilder {
491 RuntimeBuilder::new(store)
492 }
493
494 pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
495 let lowest_seq = self
496 .loaded
497 .lock()
498 .await
499 .values()
500 .flat_map(|w| w.cursor)
501 .min();
502
503 if let Some(seq) = lowest_seq {
504 debug!(lowest_seq, "found lowest seq");
505 return self.store.find_chain_point(seq);
506 }
507
508 Ok(None)
509 }
510
511 pub async fn register_worker(
512 &self,
513 id: &str,
514 wasm: &[u8],
515 config: serde_json::Value,
516 ) -> Result<(), Error> {
517 let component = wasmtime::component::Component::new(&self.engine, wasm)?;
518
519 let mut wasm_store = wasmtime::Store::new(
520 &self.engine,
521 WorkerState {
522 worker_id: id.to_owned(),
523 router: Router::new(),
524 ledger: self.ledger.clone(),
525 logging: self.logging.as_ref().map(|kv| LoggerHost::new(id, kv)),
526 kv: self
527 .kv
528 .as_ref()
529 .map(|kv| KvHost::new(id, kv, &self.metrics)),
530 sign: self.sign.as_ref().map(|s| SignerHost::new(id, s)),
531 submit: self.submit.clone(),
532 http: self.http.clone(),
533 },
534 );
535
536 let instance =
537 wit::Worker::instantiate_async(&mut wasm_store, &component, &self.linker).await?;
538
539 let config = serde_json::to_vec(&config).unwrap();
540 instance.call_init(&mut wasm_store, &config).await?;
541
542 let cursor = self.store.get_worker_cursor(id)?;
543 debug!(cursor, id, "found cursor for worker");
544
545 self.loaded.lock().await.insert(
546 id.to_owned(),
547 LoadedWorker {
548 wasm_store,
549 instance,
550 cursor,
551 metrics: self.metrics.clone(),
552 },
553 );
554
555 Ok(())
556 }
557
558 pub async fn register_worker_from_url(
563 &self,
564 id: &str,
565 url: &url::Url,
566 config: serde_json::Value,
567 ) -> Result<(), Error> {
568 let (store, path) = object_store::parse_url(url)?;
569 let bytes = store.get(&path).await?.bytes().await?;
570 self.register_worker(id, &bytes, config).await
571 }
572
573 pub async fn register_worker_from_file(
574 &self,
575 id: &str,
576 wasm_path: impl AsRef<Path>,
577 config: serde_json::Value,
578 ) -> Result<(), Error> {
579 let mut file = std::fs::File::open(wasm_path)?;
580 let mut buffer = Vec::new();
581 file.read_to_end(&mut buffer)?;
582 self.register_worker(id, &buffer, config).await
583 }
584
585 pub async fn remove_worker(&self, id: &str) -> Result<(), Error> {
586 match self.loaded.lock().await.remove(id) {
587 Some(_) => {
588 info!(worker = id, "Successfully removed worker from runtime.")
589 }
590 None => {
591 warn!(worker = id, "Worker not found, skipping remove.")
592 }
593 }
594
595 Ok(())
596 }
597
598 pub async fn handle_chain(
599 &mut self,
600 undo_blocks: &Vec<Block>,
601 next_block: &Block,
602 ) -> Result<(), Error> {
603 info!("applying block");
604
605 let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
606
607 let mut workers = self.loaded.lock().await;
608
609 let mut store_update = self.store.start_atomic_update(log_seq)?;
610
611 for (_, worker) in workers.iter_mut() {
612 worker.apply_chain(undo_blocks, next_block).await?;
613 store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
614 }
615
616 store_update.commit()?;
617
618 Ok(())
619 }
620
621 pub async fn handle_request(
622 &self,
623 worker_id: &str,
624 method: &str,
625 params: Vec<u8>,
626 ) -> Result<wit::Response, Error> {
627 let mut lock = self.loaded.lock().await;
628
629 let worker = lock
630 .get_mut(worker_id)
631 .ok_or(Error::WorkerNotFound(worker_id.to_string()))?;
632
633 let channel = worker
634 .wasm_store
635 .data()
636 .router
637 .find_request_target(method)?;
638
639 let evt = wit::Event::Request(params);
640
641 let result = worker.dispatch_event(channel, &evt).await;
642 self.metrics.request(worker_id, method, result.is_ok());
643 result
644 }
645}
646
647pub struct RuntimeBuilder {
648 store: store::Store,
649 engine: wasmtime::Engine,
650 linker: wasmtime::component::Linker<WorkerState>,
651 ledger: Option<ledgers::Ledger>,
652 logging: Option<logging::Logger>,
653 kv: Option<kv::Kv>,
654 sign: Option<sign::Signer>,
655 submit: Option<submit::Submit>,
656 http: Option<http::Http>,
657}
658
659impl RuntimeBuilder {
660 pub fn new(store: store::Store) -> Self {
661 let mut config = wasmtime::Config::new();
662 config.async_support(true);
663 let engine = wasmtime::Engine::new(&config).unwrap();
664 let mut linker = wasmtime::component::Linker::new(&engine);
665
666 wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut WorkerState| state)
667 .unwrap();
668
669 Self {
670 store,
671 engine,
672 linker,
673 ledger: None,
674 logging: None,
675 kv: None,
676 sign: None,
677 submit: None,
678 http: None,
679 }
680 }
681
682 pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
683 self.ledger = Some(ledger);
684
685 wit::balius::app::ledger::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
686 state.ledger.as_mut().unwrap()
687 })
688 .unwrap();
689
690 self
691 }
692
693 pub fn with_kv(mut self, kv: kv::Kv) -> Self {
694 wit::balius::app::kv::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
695 state.kv.as_mut().unwrap()
696 })
697 .unwrap();
698 self.kv = Some(kv);
699
700 self
701 }
702
703 pub fn with_logger(mut self, logging: logging::Logger) -> Self {
704 self.logging = Some(logging);
705
706 wit::balius::app::logging::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
707 state.logging.as_mut().unwrap()
708 })
709 .unwrap();
710
711 self
712 }
713
714 pub fn with_signer(mut self, sign: sign::Signer) -> Self {
715 self.sign = Some(sign);
716
717 wit::balius::app::sign::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
718 state.sign.as_mut().unwrap()
719 })
720 .unwrap();
721
722 self
723 }
724
725 pub fn with_submit(mut self, submit: submit::Submit) -> Self {
726 self.submit = Some(submit);
727
728 wit::balius::app::submit::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
729 state.submit.as_mut().unwrap()
730 })
731 .unwrap();
732
733 self
734 }
735
736 pub fn with_http(mut self, http: http::Http) -> Self {
737 self.http = Some(http);
738 wit::balius::app::http::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
739 state.http.as_mut().unwrap()
740 })
741 .unwrap();
742
743 self
744 }
745
746 pub fn build(self) -> Result<Runtime, Error> {
747 let mut this = self;
748 if this.logging.is_none() {
749 this = this.with_logger(logging::Logger::Silent);
750 }
751
752 let RuntimeBuilder {
753 store,
754 engine,
755 linker,
756 ledger,
757 logging,
758 kv,
759 sign,
760 submit,
761 http,
762 } = this;
763
764 Ok(Runtime {
765 metrics: Default::default(),
766 loaded: Default::default(),
767 engine,
768 linker,
769 store,
770 ledger,
771 logging,
772 kv,
773 sign,
774 submit,
775 http,
776 })
777 }
778}