balius_runtime/
lib.rs

1use kv::KvHost;
2use ledgers::LedgerHost;
3use logging::LoggerHost;
4use router::Router;
5use sign::SignerHost;
6use std::{collections::HashMap, io::Read, path::Path, sync::Arc};
7use thiserror::Error;
8use tokio::sync::Mutex;
9use tracing::{debug, info, warn};
10use utxorpc::spec::sync::BlockRef;
11use wasmtime::component::HasSelf;
12
13pub mod wit {
14    wasmtime::component::bindgen!({
15        path: "./wit",
16        async: true,
17        tracing: true,
18    });
19}
20
21mod metrics;
22mod router;
23mod store;
24
25// implementations
26pub mod drivers;
27pub mod http;
28pub mod kv;
29pub mod ledgers;
30pub mod logging;
31pub mod sign;
32pub mod submit;
33
34pub use store::Store;
35pub use wit::Response;
36
37pub type WorkerId = String;
38
39#[derive(Error, Debug)]
40pub enum Error {
41    #[error("wasm error {0}")]
42    Wasm(wasmtime::Error),
43
44    #[error("store error {0}")]
45    Store(Box<redb::Error>),
46
47    #[error("worker not found '{0}'")]
48    WorkerNotFound(WorkerId),
49
50    #[error("worker failed to handle event (code: '{0}', message: '{1}')")]
51    Handle(u32, String),
52
53    #[error("no target available to solve request")]
54    NoTarget,
55
56    #[error("more than one target available to solve request")]
57    AmbiguousTarget,
58
59    #[error("address in block failed to parse")]
60    BadAddress(pallas::ledger::addresses::Error),
61
62    #[error("ledger error: {0}")]
63    Ledger(String),
64
65    #[error("config error: {0}")]
66    Config(String),
67
68    #[error("driver error: {0}")]
69    Driver(String),
70
71    #[error("failed to interact with WASM object: {0}")]
72    ObjectStoreError(object_store::Error),
73
74    #[error("failed to interact with local WASM file: {0}")]
75    IoError(std::io::Error),
76
77    #[error("kv error {0}")]
78    KvError(String),
79}
80
81impl From<wasmtime::Error> for Error {
82    fn from(value: wasmtime::Error) -> Self {
83        Self::Wasm(value)
84    }
85}
86
87impl From<redb::Error> for Error {
88    fn from(value: redb::Error) -> Self {
89        Self::Store(Box::new(value))
90    }
91}
92
93impl From<redb::DatabaseError> for Error {
94    fn from(value: redb::DatabaseError) -> Self {
95        Self::Store(Box::new(value.into()))
96    }
97}
98
99impl From<redb::TransactionError> for Error {
100    fn from(value: redb::TransactionError) -> Self {
101        Self::Store(Box::new(value.into()))
102    }
103}
104
105impl From<redb::TableError> for Error {
106    fn from(value: redb::TableError) -> Self {
107        Self::Store(Box::new(value.into()))
108    }
109}
110
111impl From<redb::CommitError> for Error {
112    fn from(value: redb::CommitError) -> Self {
113        Self::Store(Box::new(value.into()))
114    }
115}
116
117impl From<redb::StorageError> for Error {
118    fn from(value: redb::StorageError) -> Self {
119        Self::Store(Box::new(value.into()))
120    }
121}
122
123impl From<pallas::ledger::addresses::Error> for Error {
124    fn from(value: pallas::ledger::addresses::Error) -> Self {
125        Self::BadAddress(value)
126    }
127}
128
129impl From<std::io::Error> for Error {
130    fn from(value: std::io::Error) -> Self {
131        Self::IoError(value)
132    }
133}
134
135impl From<object_store::Error> for Error {
136    fn from(value: object_store::Error) -> Self {
137        match value {
138            object_store::Error::Generic { store, source } => {
139                Self::Config(format!("Failed to parse url: {store}, {source}"))
140            }
141            object_store::Error::NotFound { path: _, source } => {
142                Self::WorkerNotFound(source.to_string())
143            }
144            x => Self::ObjectStoreError(x),
145        }
146    }
147}
148
149pub type BlockSlot = u64;
150pub type BlockHash = pallas::crypto::hash::Hash<32>;
151
152pub enum ChainPoint {
153    Cardano(utxorpc::spec::sync::BlockRef),
154}
155
156pub type LogSeq = u64;
157
158pub enum TxInput {
159    Cardano(utxorpc::spec::cardano::TxInput),
160}
161
162impl TxInput {
163    pub fn to_bytes(&self) -> Vec<u8> {
164        use prost::Message;
165
166        match self {
167            Self::Cardano(tx_input) => tx_input.encode_to_vec(),
168        }
169    }
170
171    pub fn address(&self) -> Option<Vec<u8>> {
172        match self {
173            Self::Cardano(tx_input) => tx_input.as_output.as_ref().map(|o| o.address.to_vec()),
174        }
175    }
176}
177
178pub enum Utxo {
179    Cardano(utxorpc::spec::cardano::TxOutput),
180}
181
182impl Utxo {
183    pub fn to_bytes(&self) -> Vec<u8> {
184        use prost::Message;
185
186        match self {
187            Self::Cardano(utxo) => utxo.encode_to_vec(),
188        }
189    }
190
191    pub fn address(&self) -> Option<Vec<u8>> {
192        match self {
193            Self::Cardano(utxo) => Some(utxo.address.to_vec()),
194        }
195    }
196}
197
198pub enum Tx {
199    Cardano(utxorpc::spec::cardano::Tx),
200}
201
202impl Tx {
203    pub fn hash(&self) -> Vec<u8> {
204        match self {
205            Self::Cardano(tx) => tx.hash.clone().into(),
206        }
207    }
208    pub fn inputs(&self) -> Vec<TxInput> {
209        match self {
210            Self::Cardano(tx) => tx
211                .inputs
212                .iter()
213                .map(|i| TxInput::Cardano(i.clone()))
214                .collect(),
215        }
216    }
217    pub fn outputs(&self) -> Vec<Utxo> {
218        match self {
219            Self::Cardano(tx) => tx
220                .outputs
221                .iter()
222                .map(|o| Utxo::Cardano(o.clone()))
223                .collect(),
224        }
225    }
226    pub fn to_bytes(&self) -> Vec<u8> {
227        use prost::Message;
228
229        match self {
230            Self::Cardano(tx) => tx.encode_to_vec(),
231        }
232    }
233}
234
235#[derive(Debug, Clone)]
236pub enum Block {
237    Cardano(utxorpc::spec::cardano::Block),
238}
239
240impl Block {
241    pub fn hash(&self) -> Vec<u8> {
242        match self {
243            Self::Cardano(block) => block.header.as_ref().unwrap().hash.clone().into(),
244        }
245    }
246    pub fn height(&self) -> u64 {
247        match self {
248            Self::Cardano(block) => block.header.as_ref().unwrap().height,
249        }
250    }
251    pub fn slot(&self) -> u64 {
252        match self {
253            Self::Cardano(block) => block.header.as_ref().unwrap().slot,
254        }
255    }
256    pub fn txs(&self) -> Vec<Tx> {
257        match self {
258            Self::Cardano(block) => block
259                .body
260                .iter()
261                .flat_map(|b| b.tx.iter())
262                .map(|t| Tx::Cardano(t.clone()))
263                .collect(),
264        }
265    }
266
267    pub fn chain_point(&self) -> ChainPoint {
268        match self {
269            Self::Cardano(block) => ChainPoint::Cardano(BlockRef {
270                index: block.header.as_ref().unwrap().slot,
271                hash: block.header.as_ref().unwrap().hash.clone(),
272            }),
273        }
274    }
275
276    pub fn to_bytes(&self) -> Vec<u8> {
277        use prost::Message;
278
279        match self {
280            Self::Cardano(block) => block.encode_to_vec(),
281        }
282    }
283
284    pub fn from_bytes(data: &[u8]) -> Self {
285        use prost::Message;
286
287        Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap())
288    }
289}
290
291struct WorkerState {
292    pub worker_id: String,
293    pub router: router::Router,
294    pub ledger: Option<ledgers::LedgerHost>,
295    pub logging: Option<logging::LoggerHost>,
296    pub kv: Option<kv::KvHost>,
297    pub sign: Option<sign::SignerHost>,
298    pub submit: Option<submit::Submit>,
299    pub http: Option<http::Http>,
300}
301
302impl wit::balius::app::driver::Host for WorkerState {
303    async fn register_channel(
304        &mut self,
305        id: u32,
306        pattern: wit::balius::app::driver::EventPattern,
307    ) -> () {
308        self.router.register_channel(id, &pattern);
309    }
310
311    async fn register_signer(&mut self, name: String, algorithm: String) -> Vec<u8> {
312        let signer = self.sign.as_mut().expect("No sign interface defined.");
313        signer.add_key(name, algorithm).await
314    }
315}
316
317struct LoadedWorker {
318    wasm_store: wasmtime::Store<WorkerState>,
319    instance: wit::Worker,
320    cursor: Option<LogSeq>,
321    metrics: Arc<metrics::Metrics>,
322}
323
324impl LoadedWorker {
325    pub async fn dispatch_event(
326        &mut self,
327        channel: u32,
328        event: &wit::Event,
329    ) -> Result<wit::Response, Error> {
330        self.instance
331            .call_handle(&mut self.wasm_store, channel, event)
332            .await?
333            .map_err(|err| Error::Handle(err.code, err.message))
334    }
335
336    async fn acknowledge_event(&mut self, channel: u32, event: &wit::Event) -> Result<(), Error> {
337        let result = self.dispatch_event(channel, event).await;
338
339        match result {
340            Ok(wit::Response::Acknowledge) => {
341                tracing::debug!("worker acknowledge");
342            }
343            Ok(_) => {
344                tracing::warn!("worker returned unexpected data");
345            }
346            Err(Error::Handle(code, message)) => {
347                tracing::warn!(code, message);
348            }
349            Err(e) => return Err(e),
350        }
351
352        Ok(())
353    }
354
355    async fn apply_block(&mut self, block: &Block) -> Result<(), Error> {
356        let worker_id = self.wasm_store.data().worker_id.clone();
357        let block_hash = block.hash();
358        let block_height = block.height();
359        let block_slot = block.slot();
360        for tx in block.txs() {
361            let tx_hash = tx.hash();
362            let channels = self.wasm_store.data().router.find_tx_targets(&tx);
363            if !channels.is_empty() {
364                let event = wit::Event::Tx(wit::balius::app::driver::Tx {
365                    block: wit::balius::app::driver::BlockRef {
366                        block_hash: block_hash.clone(),
367                        block_height,
368                        block_slot,
369                    },
370                    body: tx.to_bytes(),
371                    hash: tx_hash.clone(),
372                });
373                for channel in channels {
374                    self.metrics.tx_handled(&worker_id);
375                    self.acknowledge_event(channel, &event).await?;
376                }
377            }
378
379            for (index, utxo) in tx.outputs().into_iter().enumerate() {
380                let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
381                if channels.is_empty() {
382                    continue;
383                }
384
385                let event = wit::Event::Utxo(wit::balius::app::driver::Utxo {
386                    block: wit::balius::app::driver::BlockRef {
387                        block_hash: block_hash.clone(),
388                        block_height,
389                        block_slot,
390                    },
391                    body: utxo.to_bytes(),
392                    ref_: wit::balius::app::driver::TxoRef {
393                        tx_hash: tx_hash.clone(),
394                        txo_index: index as u32,
395                    },
396                });
397
398                for channel in channels {
399                    self.metrics.utxo_handled(&worker_id);
400                    self.acknowledge_event(channel, &event).await?;
401                }
402            }
403        }
404
405        Ok(())
406    }
407
408    async fn undo_block(&mut self, block: &Block) -> Result<(), Error> {
409        let worker_id = self.wasm_store.data().worker_id.clone();
410        let block_hash = block.hash();
411        let block_height = block.height();
412        let block_slot = block.slot();
413        for tx in block.txs() {
414            let tx_hash = tx.hash();
415            for (index, utxo) in tx.outputs().into_iter().enumerate().rev() {
416                let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
417                if channels.is_empty() {
418                    continue;
419                }
420
421                let event = wit::Event::UtxoUndo(wit::balius::app::driver::Utxo {
422                    block: wit::balius::app::driver::BlockRef {
423                        block_hash: block_hash.clone(),
424                        block_height,
425                        block_slot,
426                    },
427                    body: utxo.to_bytes(),
428                    ref_: wit::balius::app::driver::TxoRef {
429                        tx_hash: tx_hash.clone(),
430                        txo_index: index as u32,
431                    },
432                });
433
434                for channel in channels {
435                    self.metrics.undo_utxo_handled(&worker_id);
436                    self.acknowledge_event(channel, &event).await?;
437                }
438            }
439
440            let channels = self.wasm_store.data().router.find_tx_targets(&tx);
441            if !channels.is_empty() {
442                let event = wit::Event::TxUndo(wit::balius::app::driver::Tx {
443                    block: wit::balius::app::driver::BlockRef {
444                        block_hash: block_hash.clone(),
445                        block_height,
446                        block_slot,
447                    },
448                    body: tx.to_bytes(),
449                    hash: tx_hash.clone(),
450                });
451                for channel in channels {
452                    self.metrics.undo_tx_handled(&worker_id);
453                    self.acknowledge_event(channel, &event).await?;
454                }
455            }
456        }
457
458        Ok(())
459    }
460
461    async fn apply_chain(
462        &mut self,
463        undo_blocks: &Vec<Block>,
464        next_block: &Block,
465    ) -> Result<(), Error> {
466        for block in undo_blocks {
467            self.undo_block(block).await?;
468        }
469
470        self.apply_block(next_block).await
471    }
472}
473
474type WorkerMap = HashMap<String, LoadedWorker>;
475
476#[derive(Clone)]
477pub struct Runtime {
478    engine: wasmtime::Engine,
479    linker: wasmtime::component::Linker<WorkerState>,
480    loaded: Arc<Mutex<WorkerMap>>,
481
482    store: store::Store,
483    ledger: Option<ledgers::Ledger>,
484    logging: Option<logging::Logger>,
485    kv: Option<kv::Kv>,
486    sign: Option<sign::Signer>,
487    submit: Option<submit::Submit>,
488    http: Option<http::Http>,
489
490    metrics: Arc<metrics::Metrics>,
491}
492
493impl Runtime {
494    pub fn builder(store: store::Store) -> RuntimeBuilder {
495        RuntimeBuilder::new(store)
496    }
497
498    pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
499        let lowest_seq = self
500            .loaded
501            .lock()
502            .await
503            .values()
504            .flat_map(|w| w.cursor)
505            .min();
506
507        if let Some(seq) = lowest_seq {
508            debug!(lowest_seq, "found lowest seq");
509            return self.store.find_chain_point(seq);
510        }
511
512        Ok(None)
513    }
514
515    pub async fn register_worker(
516        &self,
517        id: &str,
518        wasm: &[u8],
519        config: serde_json::Value,
520    ) -> Result<(), Error> {
521        let component = wasmtime::component::Component::new(&self.engine, wasm)?;
522
523        let mut wasm_store = wasmtime::Store::new(
524            &self.engine,
525            WorkerState {
526                worker_id: id.to_owned(),
527                router: Router::new(),
528                ledger: self
529                    .ledger
530                    .as_ref()
531                    .map(|l| LedgerHost::new(id, l, &self.metrics)),
532                logging: self
533                    .logging
534                    .as_ref()
535                    .map(|kv| LoggerHost::new(id, kv, &self.metrics)),
536                kv: self
537                    .kv
538                    .as_ref()
539                    .map(|kv| KvHost::new(id, kv, &self.metrics)),
540                sign: self
541                    .sign
542                    .as_ref()
543                    .map(|s| SignerHost::new(id, s, &self.metrics)),
544                submit: self.submit.clone(),
545                http: self.http.clone(),
546            },
547        );
548
549        let instance =
550            wit::Worker::instantiate_async(&mut wasm_store, &component, &self.linker).await?;
551
552        let config = serde_json::to_vec(&config).unwrap();
553        instance.call_init(&mut wasm_store, &config).await?;
554
555        let cursor = self.store.get_worker_cursor(id)?;
556        debug!(cursor, id, "found cursor for worker");
557
558        self.loaded.lock().await.insert(
559            id.to_owned(),
560            LoadedWorker {
561                wasm_store,
562                instance,
563                cursor,
564                metrics: self.metrics.clone(),
565            },
566        );
567
568        Ok(())
569    }
570
571    /// Register worker into runtime using URL.
572    ///
573    /// Will download bytes from URL and interpret it as WASM. URL support is
574    /// determined by build features passed on to the [object_store](https://docs.rs/crate/object_store/latest) crate.
575    pub async fn register_worker_from_url(
576        &self,
577        id: &str,
578        url: &url::Url,
579        config: serde_json::Value,
580    ) -> Result<(), Error> {
581        let (store, path) = object_store::parse_url(url)?;
582        let bytes = store.get(&path).await?.bytes().await?;
583        self.register_worker(id, &bytes, config).await
584    }
585
586    pub async fn register_worker_from_file(
587        &self,
588        id: &str,
589        wasm_path: impl AsRef<Path>,
590        config: serde_json::Value,
591    ) -> Result<(), Error> {
592        let mut file = std::fs::File::open(wasm_path)?;
593        let mut buffer = Vec::new();
594        file.read_to_end(&mut buffer)?;
595        self.register_worker(id, &buffer, config).await
596    }
597
598    pub async fn remove_worker(&self, id: &str) -> Result<(), Error> {
599        match self.loaded.lock().await.remove(id) {
600            Some(_) => {
601                info!(worker = id, "Successfully removed worker from runtime.")
602            }
603            None => {
604                warn!(worker = id, "Worker not found, skipping remove.")
605            }
606        }
607
608        Ok(())
609    }
610
611    pub async fn handle_chain(
612        &mut self,
613        undo_blocks: &Vec<Block>,
614        next_block: &Block,
615    ) -> Result<(), Error> {
616        info!("applying block");
617
618        let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
619
620        let mut workers = self.loaded.lock().await;
621
622        let mut store_update = self.store.start_atomic_update(log_seq)?;
623
624        for (_, worker) in workers.iter_mut() {
625            worker.apply_chain(undo_blocks, next_block).await?;
626            store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
627        }
628
629        store_update.commit()?;
630
631        Ok(())
632    }
633
634    pub async fn handle_request(
635        &self,
636        worker_id: &str,
637        method: &str,
638        params: Vec<u8>,
639    ) -> Result<wit::Response, Error> {
640        let mut lock = self.loaded.lock().await;
641
642        let worker = lock
643            .get_mut(worker_id)
644            .ok_or(Error::WorkerNotFound(worker_id.to_string()))?;
645
646        let channel = worker
647            .wasm_store
648            .data()
649            .router
650            .find_request_target(method)?;
651
652        let evt = wit::Event::Request(params);
653
654        let result = worker.dispatch_event(channel, &evt).await;
655        self.metrics.request(worker_id, method, result.is_ok());
656        result
657    }
658}
659
660struct HasField<T>(std::marker::PhantomData<T>);
661impl<T: 'static> wasmtime::component::HasData for HasField<T> {
662    type Data<'a> = &'a mut T;
663}
664
665pub struct RuntimeBuilder {
666    store: store::Store,
667    engine: wasmtime::Engine,
668    linker: wasmtime::component::Linker<WorkerState>,
669    ledger: Option<ledgers::Ledger>,
670    logging: Option<logging::Logger>,
671    kv: Option<kv::Kv>,
672    sign: Option<sign::Signer>,
673    submit: Option<submit::Submit>,
674    http: Option<http::Http>,
675}
676
677impl RuntimeBuilder {
678    pub fn new(store: store::Store) -> Self {
679        let mut config = wasmtime::Config::new();
680        config.async_support(true);
681        let engine = wasmtime::Engine::new(&config).unwrap();
682        let mut linker = wasmtime::component::Linker::new(&engine);
683
684        wit::balius::app::driver::add_to_linker::<_, HasSelf<_>>(
685            &mut linker,
686            |state: &mut WorkerState| state,
687        )
688        .unwrap();
689
690        Self {
691            store,
692            engine,
693            linker,
694            ledger: None,
695            logging: None,
696            kv: None,
697            sign: None,
698            submit: None,
699            http: None,
700        }
701    }
702
703    pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
704        self.ledger = Some(ledger);
705
706        wit::balius::app::ledger::add_to_linker::<_, HasField<_>>(
707            &mut self.linker,
708            |state: &mut WorkerState| state.ledger.as_mut().unwrap(),
709        )
710        .unwrap();
711
712        self
713    }
714
715    pub fn with_kv(mut self, kv: kv::Kv) -> Self {
716        wit::balius::app::kv::add_to_linker::<_, HasField<_>>(
717            &mut self.linker,
718            |state: &mut WorkerState| state.kv.as_mut().unwrap(),
719        )
720        .unwrap();
721        self.kv = Some(kv);
722
723        self
724    }
725
726    pub fn with_logger(mut self, logging: logging::Logger) -> Self {
727        self.logging = Some(logging);
728
729        wit::balius::app::logging::add_to_linker::<_, HasField<_>>(
730            &mut self.linker,
731            |state: &mut WorkerState| state.logging.as_mut().unwrap(),
732        )
733        .unwrap();
734
735        self
736    }
737
738    pub fn with_signer(mut self, sign: sign::Signer) -> Self {
739        self.sign = Some(sign);
740
741        wit::balius::app::sign::add_to_linker::<_, HasField<_>>(
742            &mut self.linker,
743            |state: &mut WorkerState| state.sign.as_mut().unwrap(),
744        )
745        .unwrap();
746
747        self
748    }
749
750    pub fn with_submit(mut self, submit: submit::Submit) -> Self {
751        self.submit = Some(submit);
752
753        wit::balius::app::submit::add_to_linker::<_, HasField<_>>(
754            &mut self.linker,
755            |state: &mut WorkerState| state.submit.as_mut().unwrap(),
756        )
757        .unwrap();
758
759        self
760    }
761
762    pub fn with_http(mut self, http: http::Http) -> Self {
763        self.http = Some(http);
764        wit::balius::app::http::add_to_linker::<_, HasField<_>>(
765            &mut self.linker,
766            |state: &mut WorkerState| state.http.as_mut().unwrap(),
767        )
768        .unwrap();
769
770        self
771    }
772
773    pub fn build(self) -> Result<Runtime, Error> {
774        let mut this = self;
775        if this.logging.is_none() {
776            this = this.with_logger(logging::Logger::Silent);
777        }
778
779        let RuntimeBuilder {
780            store,
781            engine,
782            linker,
783            ledger,
784            logging,
785            kv,
786            sign,
787            submit,
788            http,
789        } = this;
790
791        Ok(Runtime {
792            metrics: Default::default(),
793            loaded: Default::default(),
794            engine,
795            linker,
796            store,
797            ledger,
798            logging,
799            kv,
800            sign,
801            submit,
802            http,
803        })
804    }
805}