balius_runtime/
lib.rs

1use router::Router;
2use std::{collections::HashMap, path::Path, sync::Arc};
3use thiserror::Error;
4use tokio::sync::Mutex;
5use tracing::{debug, info, warn};
6use utxorpc::spec::sync::BlockRef;
7
8mod wit {
9    wasmtime::component::bindgen!({
10        path: "./wit",
11        async: true,
12        tracing: true,
13    });
14}
15
16mod router;
17mod store;
18
19// implementations
20pub mod drivers;
21pub mod kv;
22pub mod ledgers;
23pub mod submit;
24
25pub use store::Store;
26pub use wit::Response;
27
28pub type WorkerId = String;
29
30#[derive(Error, Debug)]
31pub enum Error {
32    #[error("wasm error {0}")]
33    Wasm(wasmtime::Error),
34
35    #[error("store error {0}")]
36    Store(redb::Error),
37
38    #[error("worker not found '{0}'")]
39    WorkerNotFound(WorkerId),
40
41    #[error("worker failed to handle event (code: '{0}', message: '{1}')")]
42    Handle(u32, String),
43
44    #[error("no target available to solve request")]
45    NoTarget,
46
47    #[error("more than one target available to solve request")]
48    AmbiguousTarget,
49
50    #[error("address in block failed to parse")]
51    BadAddress(pallas::ledger::addresses::Error),
52
53    #[error("ledger error: {0}")]
54    Ledger(String),
55
56    #[error("config error: {0}")]
57    Config(String),
58
59    #[error("driver error: {0}")]
60    Driver(String),
61}
62
63impl From<wasmtime::Error> for Error {
64    fn from(value: wasmtime::Error) -> Self {
65        Self::Wasm(value)
66    }
67}
68
69impl From<redb::Error> for Error {
70    fn from(value: redb::Error) -> Self {
71        Self::Store(value)
72    }
73}
74
75impl From<redb::DatabaseError> for Error {
76    fn from(value: redb::DatabaseError) -> Self {
77        Self::Store(value.into())
78    }
79}
80
81impl From<redb::TransactionError> for Error {
82    fn from(value: redb::TransactionError) -> Self {
83        Self::Store(value.into())
84    }
85}
86
87impl From<redb::TableError> for Error {
88    fn from(value: redb::TableError) -> Self {
89        Self::Store(value.into())
90    }
91}
92
93impl From<redb::CommitError> for Error {
94    fn from(value: redb::CommitError) -> Self {
95        Self::Store(value.into())
96    }
97}
98
99impl From<redb::StorageError> for Error {
100    fn from(value: redb::StorageError) -> Self {
101        Self::Store(value.into())
102    }
103}
104
105impl From<pallas::ledger::addresses::Error> for Error {
106    fn from(value: pallas::ledger::addresses::Error) -> Self {
107        Self::BadAddress(value.into())
108    }
109}
110
111pub type BlockSlot = u64;
112pub type BlockHash = pallas::crypto::hash::Hash<32>;
113
114pub enum ChainPoint {
115    Cardano(utxorpc::spec::sync::BlockRef),
116}
117
118pub type LogSeq = u64;
119
120pub enum Utxo {
121    Cardano(utxorpc::spec::cardano::TxOutput),
122}
123
124impl Utxo {
125    pub fn to_bytes(&self) -> Vec<u8> {
126        use prost::Message;
127
128        match self {
129            Self::Cardano(utxo) => utxo.encode_to_vec(),
130        }
131    }
132}
133
134pub enum Tx {
135    Cardano(utxorpc::spec::cardano::Tx),
136}
137
138impl Tx {
139    pub fn hash(&self) -> Vec<u8> {
140        match self {
141            Self::Cardano(tx) => tx.hash.clone().into(),
142        }
143    }
144    pub fn outputs(&self) -> Vec<Utxo> {
145        match self {
146            Self::Cardano(tx) => tx
147                .outputs
148                .iter()
149                .map(|o| Utxo::Cardano(o.clone()))
150                .collect(),
151        }
152    }
153}
154
155#[derive(Debug, Clone)]
156pub enum Block {
157    Cardano(utxorpc::spec::cardano::Block),
158}
159
160impl Block {
161    pub fn hash(&self) -> Vec<u8> {
162        match self {
163            Self::Cardano(block) => block.header.as_ref().unwrap().hash.clone().into(),
164        }
165    }
166    pub fn height(&self) -> u64 {
167        match self {
168            Self::Cardano(block) => block.header.as_ref().unwrap().height,
169        }
170    }
171    pub fn txs(&self) -> Vec<Tx> {
172        match self {
173            Self::Cardano(block) => block
174                .body
175                .iter()
176                .flat_map(|b| b.tx.iter())
177                .map(|t| Tx::Cardano(t.clone()))
178                .collect(),
179        }
180    }
181
182    pub fn chain_point(&self) -> ChainPoint {
183        match self {
184            Self::Cardano(block) => ChainPoint::Cardano(BlockRef {
185                index: block.header.as_ref().unwrap().slot,
186                hash: block.header.as_ref().unwrap().hash.clone(),
187            }),
188        }
189    }
190
191    pub fn to_bytes(&self) -> Vec<u8> {
192        use prost::Message;
193
194        match self {
195            Self::Cardano(block) => block.encode_to_vec(),
196        }
197    }
198
199    pub fn from_bytes(data: &[u8]) -> Self {
200        use prost::Message;
201
202        Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap())
203    }
204}
205
206struct WorkerState {
207    pub worker_id: String,
208    pub router: router::Router,
209    pub ledger: Option<ledgers::Ledger>,
210    pub kv: Option<kv::Kv>,
211    pub submit: Option<submit::Submit>,
212}
213
214#[async_trait::async_trait]
215impl wit::balius::app::driver::Host for WorkerState {
216    async fn register_channel(
217        &mut self,
218        id: u32,
219        pattern: wit::balius::app::driver::EventPattern,
220    ) -> () {
221        self.router.register_channel(id, &pattern);
222    }
223}
224
225struct LoadedWorker {
226    wasm_store: wasmtime::Store<WorkerState>,
227    instance: wit::Worker,
228    cursor: Option<LogSeq>,
229}
230
231impl LoadedWorker {
232    pub async fn dispatch_event(
233        &mut self,
234        channel: u32,
235        event: &wit::Event,
236    ) -> Result<wit::Response, Error> {
237        self.instance
238            .call_handle(&mut self.wasm_store, channel, event)
239            .await?
240            .map_err(|err| Error::Handle(err.code, err.message))
241    }
242
243    async fn acknowledge_event(&mut self, channel: u32, event: &wit::Event) -> Result<(), Error> {
244        let result = self.dispatch_event(channel, event).await;
245
246        match result {
247            Ok(wit::Response::Acknowledge) => {
248                tracing::debug!("worker acknowledge");
249            }
250            Ok(_) => {
251                tracing::warn!("worker returned unexpected data");
252            }
253            Err(Error::Handle(code, message)) => {
254                tracing::warn!(code, message);
255            }
256            Err(e) => return Err(e),
257        }
258
259        Ok(())
260    }
261
262    async fn apply_block(&mut self, block: &Block) -> Result<(), Error> {
263        let block_hash = block.hash();
264        let block_height = block.height();
265        for tx in block.txs() {
266            let tx_hash = tx.hash();
267            for (index, utxo) in tx.outputs().into_iter().enumerate() {
268                let channels = self.wasm_store.data().router.find_utxo_targets(&utxo)?;
269                if channels.is_empty() {
270                    continue;
271                }
272
273                let event = wit::Event::Utxo(wit::balius::app::driver::Utxo {
274                    block: wit::balius::app::driver::BlockRef {
275                        block_hash: block_hash.clone(),
276                        block_height,
277                    },
278                    body: utxo.to_bytes(),
279                    ref_: wit::balius::app::driver::TxoRef {
280                        tx_hash: tx_hash.clone(),
281                        txo_index: index as u32,
282                    },
283                });
284
285                for channel in channels {
286                    self.acknowledge_event(channel, &event).await?;
287                }
288            }
289        }
290
291        Ok(())
292    }
293
294    async fn undo_block(&mut self, block: &Block) -> Result<(), Error> {
295        let block_hash = block.hash();
296        let block_height = block.height();
297        for tx in block.txs() {
298            let tx_hash = tx.hash();
299            for (index, utxo) in tx.outputs().into_iter().enumerate() {
300                let channels = self.wasm_store.data().router.find_utxo_targets(&utxo)?;
301                if channels.is_empty() {
302                    continue;
303                }
304
305                let event = wit::Event::UtxoUndo(wit::balius::app::driver::Utxo {
306                    block: wit::balius::app::driver::BlockRef {
307                        block_hash: block_hash.clone(),
308                        block_height,
309                    },
310                    body: utxo.to_bytes(),
311                    ref_: wit::balius::app::driver::TxoRef {
312                        tx_hash: tx_hash.clone(),
313                        txo_index: index as u32,
314                    },
315                });
316
317                for channel in channels {
318                    self.acknowledge_event(channel, &event).await?;
319                }
320            }
321        }
322
323        Ok(())
324    }
325
326    async fn apply_chain(
327        &mut self,
328        undo_blocks: &Vec<Block>,
329        next_block: &Block,
330    ) -> Result<(), Error> {
331        for block in undo_blocks {
332            self.undo_block(block).await?;
333        }
334
335        self.apply_block(next_block).await
336    }
337}
338
339type WorkerMap = HashMap<String, LoadedWorker>;
340
341#[derive(Clone)]
342pub struct Runtime {
343    engine: wasmtime::Engine,
344    linker: wasmtime::component::Linker<WorkerState>,
345    loaded: Arc<Mutex<WorkerMap>>,
346
347    store: store::Store,
348    ledger: Option<ledgers::Ledger>,
349    kv: Option<kv::Kv>,
350    submit: Option<submit::Submit>,
351}
352
353impl Runtime {
354    pub fn builder(store: store::Store) -> RuntimeBuilder {
355        RuntimeBuilder::new(store)
356    }
357
358    pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
359        let lowest_seq = self
360            .loaded
361            .lock()
362            .await
363            .values()
364            .map(|w| w.cursor)
365            .flatten()
366            .min();
367
368        if let Some(seq) = lowest_seq {
369            debug!(lowest_seq, "found lowest seq");
370            return self.store.find_chain_point(seq);
371        }
372
373        Ok(None)
374    }
375
376    pub async fn register_worker(
377        &mut self,
378        id: &str,
379        wasm_path: impl AsRef<Path>,
380        config: serde_json::Value,
381    ) -> Result<(), Error> {
382        let component = wasmtime::component::Component::from_file(&self.engine, wasm_path)?;
383
384        let mut wasm_store = wasmtime::Store::new(
385            &self.engine,
386            WorkerState {
387                worker_id: id.to_owned(),
388                router: Router::new(),
389                ledger: self.ledger.clone(),
390                kv: self.kv.clone(),
391                submit: self.submit.clone(),
392            },
393        );
394
395        let instance =
396            wit::Worker::instantiate_async(&mut wasm_store, &component, &self.linker).await?;
397
398        let config = serde_json::to_vec(&config).unwrap();
399        instance.call_init(&mut wasm_store, &config).await?;
400
401        let cursor = self.store.get_worker_cursor(id)?;
402        debug!(cursor, id, "found cursor for worker");
403
404        self.loaded.lock().await.insert(
405            id.to_owned(),
406            LoadedWorker {
407                wasm_store,
408                instance,
409                cursor,
410            },
411        );
412
413        Ok(())
414    }
415
416    pub async fn handle_chain(
417        &mut self,
418        undo_blocks: &Vec<Block>,
419        next_block: &Block,
420    ) -> Result<(), Error> {
421        info!("applying block");
422
423        let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
424
425        let mut workers = self.loaded.lock().await;
426
427        let mut store_update = self.store.start_atomic_update(log_seq)?;
428
429        for (_, worker) in workers.iter_mut() {
430            worker.apply_chain(undo_blocks, next_block).await?;
431            store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
432        }
433
434        store_update.commit()?;
435
436        Ok(())
437    }
438
439    pub async fn handle_request(
440        &self,
441        worker: &str,
442        method: &str,
443        params: Vec<u8>,
444    ) -> Result<wit::Response, Error> {
445        let mut lock = self.loaded.lock().await;
446
447        let worker = lock
448            .get_mut(worker)
449            .ok_or(Error::WorkerNotFound(worker.to_string()))?;
450
451        let channel = worker
452            .wasm_store
453            .data()
454            .router
455            .find_request_target(method)?;
456
457        let evt = wit::Event::Request(params);
458
459        worker.dispatch_event(channel, &evt).await
460    }
461}
462
463pub struct RuntimeBuilder {
464    store: store::Store,
465    engine: wasmtime::Engine,
466    linker: wasmtime::component::Linker<WorkerState>,
467    ledger: Option<ledgers::Ledger>,
468    kv: Option<kv::Kv>,
469    submit: Option<submit::Submit>,
470}
471
472impl RuntimeBuilder {
473    pub fn new(store: store::Store) -> Self {
474        let mut config = wasmtime::Config::new();
475        config.async_support(true);
476        let engine = wasmtime::Engine::new(&config).unwrap();
477        let mut linker = wasmtime::component::Linker::new(&engine);
478
479        wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut WorkerState| state)
480            .unwrap();
481
482        Self {
483            store,
484            engine,
485            linker,
486            ledger: None,
487            kv: None,
488            submit: None,
489        }
490    }
491
492    pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
493        self.ledger = Some(ledger);
494
495        wit::balius::app::ledger::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
496            state.ledger.as_mut().unwrap()
497        })
498        .unwrap();
499
500        self
501    }
502
503    pub fn with_kv(mut self, kv: kv::Kv) -> Self {
504        self.kv = Some(kv);
505
506        wit::balius::app::kv::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
507            state.kv.as_mut().unwrap()
508        })
509        .unwrap();
510
511        self
512    }
513
514    pub fn with_submit(mut self, submit: submit::Submit) -> Self {
515        self.submit = Some(submit);
516
517        wit::balius::app::submit::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
518            state.submit.as_mut().unwrap()
519        })
520        .unwrap();
521
522        self
523    }
524
525    pub fn build(self) -> Result<Runtime, Error> {
526        let RuntimeBuilder {
527            store,
528            engine,
529            linker,
530            ledger,
531            kv,
532            submit,
533        } = self;
534
535        Ok(Runtime {
536            loaded: Default::default(),
537            engine,
538            linker,
539            store,
540            ledger,
541            kv,
542            submit,
543        })
544    }
545}