balius_runtime/
lib.rs

1use kv::KvHost;
2use logging::LoggerHost;
3use router::Router;
4use sign::SignerHost;
5use std::{collections::HashMap, io::Read, path::Path, sync::Arc};
6use thiserror::Error;
7use tokio::sync::Mutex;
8use tracing::{debug, info, warn};
9use utxorpc::spec::sync::BlockRef;
10
11pub mod wit {
12    wasmtime::component::bindgen!({
13        path: "./wit",
14        async: true,
15        tracing: true,
16    });
17}
18
19mod metrics;
20mod router;
21mod store;
22
23// implementations
24pub mod drivers;
25pub mod http;
26pub mod kv;
27pub mod ledgers;
28pub mod logging;
29pub mod sign;
30pub mod submit;
31
32pub use store::Store;
33pub use wit::Response;
34
35pub type WorkerId = String;
36
37#[derive(Error, Debug)]
38pub enum Error {
39    #[error("wasm error {0}")]
40    Wasm(wasmtime::Error),
41
42    #[error("store error {0}")]
43    Store(Box<redb::Error>),
44
45    #[error("worker not found '{0}'")]
46    WorkerNotFound(WorkerId),
47
48    #[error("worker failed to handle event (code: '{0}', message: '{1}')")]
49    Handle(u32, String),
50
51    #[error("no target available to solve request")]
52    NoTarget,
53
54    #[error("more than one target available to solve request")]
55    AmbiguousTarget,
56
57    #[error("address in block failed to parse")]
58    BadAddress(pallas::ledger::addresses::Error),
59
60    #[error("ledger error: {0}")]
61    Ledger(String),
62
63    #[error("config error: {0}")]
64    Config(String),
65
66    #[error("driver error: {0}")]
67    Driver(String),
68
69    #[error("failed to interact with WASM object: {0}")]
70    ObjectStoreError(object_store::Error),
71
72    #[error("failed to interact with local WASM file: {0}")]
73    IoError(std::io::Error),
74}
75
76impl From<wasmtime::Error> for Error {
77    fn from(value: wasmtime::Error) -> Self {
78        Self::Wasm(value)
79    }
80}
81
82impl From<redb::Error> for Error {
83    fn from(value: redb::Error) -> Self {
84        Self::Store(Box::new(value))
85    }
86}
87
88impl From<redb::DatabaseError> for Error {
89    fn from(value: redb::DatabaseError) -> Self {
90        Self::Store(Box::new(value.into()))
91    }
92}
93
94impl From<redb::TransactionError> for Error {
95    fn from(value: redb::TransactionError) -> Self {
96        Self::Store(Box::new(value.into()))
97    }
98}
99
100impl From<redb::TableError> for Error {
101    fn from(value: redb::TableError) -> Self {
102        Self::Store(Box::new(value.into()))
103    }
104}
105
106impl From<redb::CommitError> for Error {
107    fn from(value: redb::CommitError) -> Self {
108        Self::Store(Box::new(value.into()))
109    }
110}
111
112impl From<redb::StorageError> for Error {
113    fn from(value: redb::StorageError) -> Self {
114        Self::Store(Box::new(value.into()))
115    }
116}
117
118impl From<pallas::ledger::addresses::Error> for Error {
119    fn from(value: pallas::ledger::addresses::Error) -> Self {
120        Self::BadAddress(value)
121    }
122}
123
124impl From<std::io::Error> for Error {
125    fn from(value: std::io::Error) -> Self {
126        Self::IoError(value)
127    }
128}
129
130impl From<object_store::Error> for Error {
131    fn from(value: object_store::Error) -> Self {
132        match value {
133            object_store::Error::Generic { store, source } => {
134                Self::Config(format!("Failed to parse url: {}, {}", store, source))
135            }
136            object_store::Error::NotFound { path: _, source } => {
137                Self::WorkerNotFound(source.to_string())
138            }
139            x => Self::ObjectStoreError(x),
140        }
141    }
142}
143
144pub type BlockSlot = u64;
145pub type BlockHash = pallas::crypto::hash::Hash<32>;
146
147pub enum ChainPoint {
148    Cardano(utxorpc::spec::sync::BlockRef),
149}
150
151pub type LogSeq = u64;
152
153pub enum TxInput {
154    Cardano(utxorpc::spec::cardano::TxInput),
155}
156
157impl TxInput {
158    pub fn to_bytes(&self) -> Vec<u8> {
159        use prost::Message;
160
161        match self {
162            Self::Cardano(tx_input) => tx_input.encode_to_vec(),
163        }
164    }
165
166    pub fn address(&self) -> Option<Vec<u8>> {
167        match self {
168            Self::Cardano(tx_input) => tx_input.as_output.as_ref().map(|o| o.address.to_vec()),
169        }
170    }
171}
172
173pub enum Utxo {
174    Cardano(utxorpc::spec::cardano::TxOutput),
175}
176
177impl Utxo {
178    pub fn to_bytes(&self) -> Vec<u8> {
179        use prost::Message;
180
181        match self {
182            Self::Cardano(utxo) => utxo.encode_to_vec(),
183        }
184    }
185
186    pub fn address(&self) -> Option<Vec<u8>> {
187        match self {
188            Self::Cardano(utxo) => Some(utxo.address.to_vec()),
189        }
190    }
191}
192
193pub enum Tx {
194    Cardano(utxorpc::spec::cardano::Tx),
195}
196
197impl Tx {
198    pub fn hash(&self) -> Vec<u8> {
199        match self {
200            Self::Cardano(tx) => tx.hash.clone().into(),
201        }
202    }
203    pub fn inputs(&self) -> Vec<TxInput> {
204        match self {
205            Self::Cardano(tx) => tx
206                .inputs
207                .iter()
208                .map(|i| TxInput::Cardano(i.clone()))
209                .collect(),
210        }
211    }
212    pub fn outputs(&self) -> Vec<Utxo> {
213        match self {
214            Self::Cardano(tx) => tx
215                .outputs
216                .iter()
217                .map(|o| Utxo::Cardano(o.clone()))
218                .collect(),
219        }
220    }
221    pub fn to_bytes(&self) -> Vec<u8> {
222        use prost::Message;
223
224        match self {
225            Self::Cardano(tx) => tx.encode_to_vec(),
226        }
227    }
228}
229
230#[derive(Debug, Clone)]
231pub enum Block {
232    Cardano(utxorpc::spec::cardano::Block),
233}
234
235impl Block {
236    pub fn hash(&self) -> Vec<u8> {
237        match self {
238            Self::Cardano(block) => block.header.as_ref().unwrap().hash.clone().into(),
239        }
240    }
241    pub fn height(&self) -> u64 {
242        match self {
243            Self::Cardano(block) => block.header.as_ref().unwrap().height,
244        }
245    }
246    pub fn slot(&self) -> u64 {
247        match self {
248            Self::Cardano(block) => block.header.as_ref().unwrap().slot,
249        }
250    }
251    pub fn txs(&self) -> Vec<Tx> {
252        match self {
253            Self::Cardano(block) => block
254                .body
255                .iter()
256                .flat_map(|b| b.tx.iter())
257                .map(|t| Tx::Cardano(t.clone()))
258                .collect(),
259        }
260    }
261
262    pub fn chain_point(&self) -> ChainPoint {
263        match self {
264            Self::Cardano(block) => ChainPoint::Cardano(BlockRef {
265                index: block.header.as_ref().unwrap().slot,
266                hash: block.header.as_ref().unwrap().hash.clone(),
267            }),
268        }
269    }
270
271    pub fn to_bytes(&self) -> Vec<u8> {
272        use prost::Message;
273
274        match self {
275            Self::Cardano(block) => block.encode_to_vec(),
276        }
277    }
278
279    pub fn from_bytes(data: &[u8]) -> Self {
280        use prost::Message;
281
282        Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap())
283    }
284}
285
286struct WorkerState {
287    pub worker_id: String,
288    pub router: router::Router,
289    pub ledger: Option<ledgers::Ledger>,
290    pub logging: Option<logging::LoggerHost>,
291    pub kv: Option<kv::KvHost>,
292    pub sign: Option<sign::SignerHost>,
293    pub submit: Option<submit::Submit>,
294    pub http: Option<http::Http>,
295}
296
297#[async_trait::async_trait]
298impl wit::balius::app::driver::Host for WorkerState {
299    async fn register_channel(
300        &mut self,
301        id: u32,
302        pattern: wit::balius::app::driver::EventPattern,
303    ) -> () {
304        self.router.register_channel(id, &pattern);
305    }
306
307    async fn register_signer(&mut self, name: String, algorithm: String) -> Vec<u8> {
308        let signer = self.sign.as_mut().expect("No sign interface defined.");
309        signer.add_key(name, algorithm).await
310    }
311}
312
313struct LoadedWorker {
314    wasm_store: wasmtime::Store<WorkerState>,
315    instance: wit::Worker,
316    cursor: Option<LogSeq>,
317    metrics: Arc<metrics::Metrics>,
318}
319
320impl LoadedWorker {
321    pub async fn dispatch_event(
322        &mut self,
323        channel: u32,
324        event: &wit::Event,
325    ) -> Result<wit::Response, Error> {
326        self.instance
327            .call_handle(&mut self.wasm_store, channel, event)
328            .await?
329            .map_err(|err| Error::Handle(err.code, err.message))
330    }
331
332    async fn acknowledge_event(&mut self, channel: u32, event: &wit::Event) -> Result<(), Error> {
333        let result = self.dispatch_event(channel, event).await;
334
335        match result {
336            Ok(wit::Response::Acknowledge) => {
337                tracing::debug!("worker acknowledge");
338            }
339            Ok(_) => {
340                tracing::warn!("worker returned unexpected data");
341            }
342            Err(Error::Handle(code, message)) => {
343                tracing::warn!(code, message);
344            }
345            Err(e) => return Err(e),
346        }
347
348        Ok(())
349    }
350
351    async fn apply_block(&mut self, block: &Block) -> Result<(), Error> {
352        let worker_id = self.wasm_store.data().worker_id.clone();
353        let block_hash = block.hash();
354        let block_height = block.height();
355        let block_slot = block.slot();
356        for tx in block.txs() {
357            let tx_hash = tx.hash();
358            let channels = self.wasm_store.data().router.find_tx_targets(&tx);
359            if !channels.is_empty() {
360                let event = wit::Event::Tx(wit::balius::app::driver::Tx {
361                    block: wit::balius::app::driver::BlockRef {
362                        block_hash: block_hash.clone(),
363                        block_height,
364                        block_slot,
365                    },
366                    body: tx.to_bytes(),
367                    hash: tx_hash.clone(),
368                });
369                for channel in channels {
370                    self.metrics.tx_handled(&worker_id);
371                    self.acknowledge_event(channel, &event).await?;
372                }
373            }
374
375            for (index, utxo) in tx.outputs().into_iter().enumerate() {
376                let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
377                if channels.is_empty() {
378                    continue;
379                }
380
381                let event = wit::Event::Utxo(wit::balius::app::driver::Utxo {
382                    block: wit::balius::app::driver::BlockRef {
383                        block_hash: block_hash.clone(),
384                        block_height,
385                        block_slot,
386                    },
387                    body: utxo.to_bytes(),
388                    ref_: wit::balius::app::driver::TxoRef {
389                        tx_hash: tx_hash.clone(),
390                        txo_index: index as u32,
391                    },
392                });
393
394                for channel in channels {
395                    self.metrics.utxo_handled(&worker_id);
396                    self.acknowledge_event(channel, &event).await?;
397                }
398            }
399        }
400
401        Ok(())
402    }
403
404    async fn undo_block(&mut self, block: &Block) -> Result<(), Error> {
405        let worker_id = self.wasm_store.data().worker_id.clone();
406        let block_hash = block.hash();
407        let block_height = block.height();
408        let block_slot = block.slot();
409        for tx in block.txs() {
410            let tx_hash = tx.hash();
411            for (index, utxo) in tx.outputs().into_iter().enumerate().rev() {
412                let channels = self.wasm_store.data().router.find_utxo_targets(&utxo);
413                if channels.is_empty() {
414                    continue;
415                }
416
417                let event = wit::Event::UtxoUndo(wit::balius::app::driver::Utxo {
418                    block: wit::balius::app::driver::BlockRef {
419                        block_hash: block_hash.clone(),
420                        block_height,
421                        block_slot,
422                    },
423                    body: utxo.to_bytes(),
424                    ref_: wit::balius::app::driver::TxoRef {
425                        tx_hash: tx_hash.clone(),
426                        txo_index: index as u32,
427                    },
428                });
429
430                for channel in channels {
431                    self.metrics.undo_utxo_handled(&worker_id);
432                    self.acknowledge_event(channel, &event).await?;
433                }
434            }
435
436            let channels = self.wasm_store.data().router.find_tx_targets(&tx);
437            if !channels.is_empty() {
438                let event = wit::Event::TxUndo(wit::balius::app::driver::Tx {
439                    block: wit::balius::app::driver::BlockRef {
440                        block_hash: block_hash.clone(),
441                        block_height,
442                        block_slot,
443                    },
444                    body: tx.to_bytes(),
445                    hash: tx_hash.clone(),
446                });
447                for channel in channels {
448                    self.metrics.undo_tx_handled(&worker_id);
449                    self.acknowledge_event(channel, &event).await?;
450                }
451            }
452        }
453
454        Ok(())
455    }
456
457    async fn apply_chain(
458        &mut self,
459        undo_blocks: &Vec<Block>,
460        next_block: &Block,
461    ) -> Result<(), Error> {
462        for block in undo_blocks {
463            self.undo_block(block).await?;
464        }
465
466        self.apply_block(next_block).await
467    }
468}
469
470type WorkerMap = HashMap<String, LoadedWorker>;
471
472#[derive(Clone)]
473pub struct Runtime {
474    engine: wasmtime::Engine,
475    linker: wasmtime::component::Linker<WorkerState>,
476    loaded: Arc<Mutex<WorkerMap>>,
477
478    store: store::Store,
479    ledger: Option<ledgers::Ledger>,
480    logging: Option<logging::Logger>,
481    kv: Option<kv::Kv>,
482    sign: Option<sign::Signer>,
483    submit: Option<submit::Submit>,
484    http: Option<http::Http>,
485
486    metrics: Arc<metrics::Metrics>,
487}
488
489impl Runtime {
490    pub fn builder(store: store::Store) -> RuntimeBuilder {
491        RuntimeBuilder::new(store)
492    }
493
494    pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
495        let lowest_seq = self
496            .loaded
497            .lock()
498            .await
499            .values()
500            .flat_map(|w| w.cursor)
501            .min();
502
503        if let Some(seq) = lowest_seq {
504            debug!(lowest_seq, "found lowest seq");
505            return self.store.find_chain_point(seq);
506        }
507
508        Ok(None)
509    }
510
511    pub async fn register_worker(
512        &self,
513        id: &str,
514        wasm: &[u8],
515        config: serde_json::Value,
516    ) -> Result<(), Error> {
517        let component = wasmtime::component::Component::new(&self.engine, wasm)?;
518
519        let mut wasm_store = wasmtime::Store::new(
520            &self.engine,
521            WorkerState {
522                worker_id: id.to_owned(),
523                router: Router::new(),
524                ledger: self.ledger.clone(),
525                logging: self.logging.as_ref().map(|kv| LoggerHost::new(id, kv)),
526                kv: self
527                    .kv
528                    .as_ref()
529                    .map(|kv| KvHost::new(id, kv, &self.metrics)),
530                sign: self.sign.as_ref().map(|s| SignerHost::new(id, s)),
531                submit: self.submit.clone(),
532                http: self.http.clone(),
533            },
534        );
535
536        let instance =
537            wit::Worker::instantiate_async(&mut wasm_store, &component, &self.linker).await?;
538
539        let config = serde_json::to_vec(&config).unwrap();
540        instance.call_init(&mut wasm_store, &config).await?;
541
542        let cursor = self.store.get_worker_cursor(id)?;
543        debug!(cursor, id, "found cursor for worker");
544
545        self.loaded.lock().await.insert(
546            id.to_owned(),
547            LoadedWorker {
548                wasm_store,
549                instance,
550                cursor,
551                metrics: self.metrics.clone(),
552            },
553        );
554
555        Ok(())
556    }
557
558    /// Register worker into runtime using URL.
559    ///
560    /// Will download bytes from URL and interpret it as WASM. URL support is
561    /// determined by build features passed on to the [object_store](https://docs.rs/crate/object_store/latest) crate.
562    pub async fn register_worker_from_url(
563        &self,
564        id: &str,
565        url: &url::Url,
566        config: serde_json::Value,
567    ) -> Result<(), Error> {
568        let (store, path) = object_store::parse_url(url)?;
569        let bytes = store.get(&path).await?.bytes().await?;
570        self.register_worker(id, &bytes, config).await
571    }
572
573    pub async fn register_worker_from_file(
574        &self,
575        id: &str,
576        wasm_path: impl AsRef<Path>,
577        config: serde_json::Value,
578    ) -> Result<(), Error> {
579        let mut file = std::fs::File::open(wasm_path)?;
580        let mut buffer = Vec::new();
581        file.read_to_end(&mut buffer)?;
582        self.register_worker(id, &buffer, config).await
583    }
584
585    pub async fn remove_worker(&self, id: &str) -> Result<(), Error> {
586        match self.loaded.lock().await.remove(id) {
587            Some(_) => {
588                info!(worker = id, "Successfully removed worker from runtime.")
589            }
590            None => {
591                warn!(worker = id, "Worker not found, skipping remove.")
592            }
593        }
594
595        Ok(())
596    }
597
598    pub async fn handle_chain(
599        &mut self,
600        undo_blocks: &Vec<Block>,
601        next_block: &Block,
602    ) -> Result<(), Error> {
603        info!("applying block");
604
605        let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
606
607        let mut workers = self.loaded.lock().await;
608
609        let mut store_update = self.store.start_atomic_update(log_seq)?;
610
611        for (_, worker) in workers.iter_mut() {
612            worker.apply_chain(undo_blocks, next_block).await?;
613            store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
614        }
615
616        store_update.commit()?;
617
618        Ok(())
619    }
620
621    pub async fn handle_request(
622        &self,
623        worker_id: &str,
624        method: &str,
625        params: Vec<u8>,
626    ) -> Result<wit::Response, Error> {
627        let mut lock = self.loaded.lock().await;
628
629        let worker = lock
630            .get_mut(worker_id)
631            .ok_or(Error::WorkerNotFound(worker_id.to_string()))?;
632
633        let channel = worker
634            .wasm_store
635            .data()
636            .router
637            .find_request_target(method)?;
638
639        let evt = wit::Event::Request(params);
640
641        let result = worker.dispatch_event(channel, &evt).await;
642        self.metrics.request(worker_id, method, result.is_ok());
643        result
644    }
645}
646
647pub struct RuntimeBuilder {
648    store: store::Store,
649    engine: wasmtime::Engine,
650    linker: wasmtime::component::Linker<WorkerState>,
651    ledger: Option<ledgers::Ledger>,
652    logging: Option<logging::Logger>,
653    kv: Option<kv::Kv>,
654    sign: Option<sign::Signer>,
655    submit: Option<submit::Submit>,
656    http: Option<http::Http>,
657}
658
659impl RuntimeBuilder {
660    pub fn new(store: store::Store) -> Self {
661        let mut config = wasmtime::Config::new();
662        config.async_support(true);
663        let engine = wasmtime::Engine::new(&config).unwrap();
664        let mut linker = wasmtime::component::Linker::new(&engine);
665
666        wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut WorkerState| state)
667            .unwrap();
668
669        Self {
670            store,
671            engine,
672            linker,
673            ledger: None,
674            logging: None,
675            kv: None,
676            sign: None,
677            submit: None,
678            http: None,
679        }
680    }
681
682    pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
683        self.ledger = Some(ledger);
684
685        wit::balius::app::ledger::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
686            state.ledger.as_mut().unwrap()
687        })
688        .unwrap();
689
690        self
691    }
692
693    pub fn with_kv(mut self, kv: kv::Kv) -> Self {
694        wit::balius::app::kv::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
695            state.kv.as_mut().unwrap()
696        })
697        .unwrap();
698        self.kv = Some(kv);
699
700        self
701    }
702
703    pub fn with_logger(mut self, logging: logging::Logger) -> Self {
704        self.logging = Some(logging);
705
706        wit::balius::app::logging::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
707            state.logging.as_mut().unwrap()
708        })
709        .unwrap();
710
711        self
712    }
713
714    pub fn with_signer(mut self, sign: sign::Signer) -> Self {
715        self.sign = Some(sign);
716
717        wit::balius::app::sign::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
718            state.sign.as_mut().unwrap()
719        })
720        .unwrap();
721
722        self
723    }
724
725    pub fn with_submit(mut self, submit: submit::Submit) -> Self {
726        self.submit = Some(submit);
727
728        wit::balius::app::submit::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
729            state.submit.as_mut().unwrap()
730        })
731        .unwrap();
732
733        self
734    }
735
736    pub fn with_http(mut self, http: http::Http) -> Self {
737        self.http = Some(http);
738        wit::balius::app::http::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
739            state.http.as_mut().unwrap()
740        })
741        .unwrap();
742
743        self
744    }
745
746    pub fn build(self) -> Result<Runtime, Error> {
747        let mut this = self;
748        if this.logging.is_none() {
749            this = this.with_logger(logging::Logger::Silent);
750        }
751
752        let RuntimeBuilder {
753            store,
754            engine,
755            linker,
756            ledger,
757            logging,
758            kv,
759            sign,
760            submit,
761            http,
762        } = this;
763
764        Ok(Runtime {
765            metrics: Default::default(),
766            loaded: Default::default(),
767            engine,
768            linker,
769            store,
770            ledger,
771            logging,
772            kv,
773            sign,
774            submit,
775            http,
776        })
777    }
778}