1use router::Router;
2use std::{collections::HashMap, path::Path, sync::Arc};
3use thiserror::Error;
4use tokio::sync::Mutex;
5use tracing::{debug, info, warn};
6use utxorpc::spec::sync::BlockRef;
7
8mod wit {
9 wasmtime::component::bindgen!({
10 path: "./wit",
11 async: true,
12 tracing: true,
13 });
14}
15
16mod router;
17mod store;
18
19pub mod drivers;
21pub mod kv;
22pub mod ledgers;
23pub mod submit;
24
25pub use store::Store;
26pub use wit::Response;
27
28pub type WorkerId = String;
29
30#[derive(Error, Debug)]
31pub enum Error {
32 #[error("wasm error {0}")]
33 Wasm(wasmtime::Error),
34
35 #[error("store error {0}")]
36 Store(redb::Error),
37
38 #[error("worker not found '{0}'")]
39 WorkerNotFound(WorkerId),
40
41 #[error("worker failed to handle event (code: '{0}', message: '{1}')")]
42 Handle(u32, String),
43
44 #[error("no target available to solve request")]
45 NoTarget,
46
47 #[error("more than one target available to solve request")]
48 AmbiguousTarget,
49
50 #[error("address in block failed to parse")]
51 BadAddress(pallas::ledger::addresses::Error),
52
53 #[error("ledger error: {0}")]
54 Ledger(String),
55
56 #[error("config error: {0}")]
57 Config(String),
58
59 #[error("driver error: {0}")]
60 Driver(String),
61}
62
63impl From<wasmtime::Error> for Error {
64 fn from(value: wasmtime::Error) -> Self {
65 Self::Wasm(value)
66 }
67}
68
69impl From<redb::Error> for Error {
70 fn from(value: redb::Error) -> Self {
71 Self::Store(value)
72 }
73}
74
75impl From<redb::DatabaseError> for Error {
76 fn from(value: redb::DatabaseError) -> Self {
77 Self::Store(value.into())
78 }
79}
80
81impl From<redb::TransactionError> for Error {
82 fn from(value: redb::TransactionError) -> Self {
83 Self::Store(value.into())
84 }
85}
86
87impl From<redb::TableError> for Error {
88 fn from(value: redb::TableError) -> Self {
89 Self::Store(value.into())
90 }
91}
92
93impl From<redb::CommitError> for Error {
94 fn from(value: redb::CommitError) -> Self {
95 Self::Store(value.into())
96 }
97}
98
99impl From<redb::StorageError> for Error {
100 fn from(value: redb::StorageError) -> Self {
101 Self::Store(value.into())
102 }
103}
104
105impl From<pallas::ledger::addresses::Error> for Error {
106 fn from(value: pallas::ledger::addresses::Error) -> Self {
107 Self::BadAddress(value.into())
108 }
109}
110
111pub type BlockSlot = u64;
112pub type BlockHash = pallas::crypto::hash::Hash<32>;
113
114pub enum ChainPoint {
115 Cardano(utxorpc::spec::sync::BlockRef),
116}
117
118pub type LogSeq = u64;
119
120pub enum Utxo {
121 Cardano(utxorpc::spec::cardano::TxOutput),
122}
123
124impl Utxo {
125 pub fn to_bytes(&self) -> Vec<u8> {
126 use prost::Message;
127
128 match self {
129 Self::Cardano(utxo) => utxo.encode_to_vec(),
130 }
131 }
132}
133
134pub enum Tx {
135 Cardano(utxorpc::spec::cardano::Tx),
136}
137
138impl Tx {
139 pub fn hash(&self) -> Vec<u8> {
140 match self {
141 Self::Cardano(tx) => tx.hash.clone().into(),
142 }
143 }
144 pub fn outputs(&self) -> Vec<Utxo> {
145 match self {
146 Self::Cardano(tx) => tx
147 .outputs
148 .iter()
149 .map(|o| Utxo::Cardano(o.clone()))
150 .collect(),
151 }
152 }
153}
154
155#[derive(Debug, Clone)]
156pub enum Block {
157 Cardano(utxorpc::spec::cardano::Block),
158}
159
160impl Block {
161 pub fn hash(&self) -> Vec<u8> {
162 match self {
163 Self::Cardano(block) => block.header.as_ref().unwrap().hash.clone().into(),
164 }
165 }
166 pub fn height(&self) -> u64 {
167 match self {
168 Self::Cardano(block) => block.header.as_ref().unwrap().height,
169 }
170 }
171 pub fn txs(&self) -> Vec<Tx> {
172 match self {
173 Self::Cardano(block) => block
174 .body
175 .iter()
176 .flat_map(|b| b.tx.iter())
177 .map(|t| Tx::Cardano(t.clone()))
178 .collect(),
179 }
180 }
181
182 pub fn chain_point(&self) -> ChainPoint {
183 match self {
184 Self::Cardano(block) => ChainPoint::Cardano(BlockRef {
185 index: block.header.as_ref().unwrap().slot,
186 hash: block.header.as_ref().unwrap().hash.clone(),
187 }),
188 }
189 }
190
191 pub fn to_bytes(&self) -> Vec<u8> {
192 use prost::Message;
193
194 match self {
195 Self::Cardano(block) => block.encode_to_vec(),
196 }
197 }
198
199 pub fn from_bytes(data: &[u8]) -> Self {
200 use prost::Message;
201
202 Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap())
203 }
204}
205
206struct WorkerState {
207 pub worker_id: String,
208 pub router: router::Router,
209 pub ledger: Option<ledgers::Ledger>,
210 pub kv: Option<kv::Kv>,
211 pub submit: Option<submit::Submit>,
212}
213
214#[async_trait::async_trait]
215impl wit::balius::app::driver::Host for WorkerState {
216 async fn register_channel(
217 &mut self,
218 id: u32,
219 pattern: wit::balius::app::driver::EventPattern,
220 ) -> () {
221 self.router.register_channel(id, &pattern);
222 }
223}
224
225struct LoadedWorker {
226 wasm_store: wasmtime::Store<WorkerState>,
227 instance: wit::Worker,
228 cursor: Option<LogSeq>,
229}
230
231impl LoadedWorker {
232 pub async fn dispatch_event(
233 &mut self,
234 channel: u32,
235 event: &wit::Event,
236 ) -> Result<wit::Response, Error> {
237 self.instance
238 .call_handle(&mut self.wasm_store, channel, event)
239 .await?
240 .map_err(|err| Error::Handle(err.code, err.message))
241 }
242
243 async fn acknowledge_event(&mut self, channel: u32, event: &wit::Event) -> Result<(), Error> {
244 let result = self.dispatch_event(channel, event).await;
245
246 match result {
247 Ok(wit::Response::Acknowledge) => {
248 tracing::debug!("worker acknowledge");
249 }
250 Ok(_) => {
251 tracing::warn!("worker returned unexpected data");
252 }
253 Err(Error::Handle(code, message)) => {
254 tracing::warn!(code, message);
255 }
256 Err(e) => return Err(e),
257 }
258
259 Ok(())
260 }
261
262 async fn apply_block(&mut self, block: &Block) -> Result<(), Error> {
263 let block_hash = block.hash();
264 let block_height = block.height();
265 for tx in block.txs() {
266 let tx_hash = tx.hash();
267 for (index, utxo) in tx.outputs().into_iter().enumerate() {
268 let channels = self.wasm_store.data().router.find_utxo_targets(&utxo)?;
269 if channels.is_empty() {
270 continue;
271 }
272
273 let event = wit::Event::Utxo(wit::balius::app::driver::Utxo {
274 block: wit::balius::app::driver::BlockRef {
275 block_hash: block_hash.clone(),
276 block_height,
277 },
278 body: utxo.to_bytes(),
279 ref_: wit::balius::app::driver::TxoRef {
280 tx_hash: tx_hash.clone(),
281 txo_index: index as u32,
282 },
283 });
284
285 for channel in channels {
286 self.acknowledge_event(channel, &event).await?;
287 }
288 }
289 }
290
291 Ok(())
292 }
293
294 async fn undo_block(&mut self, block: &Block) -> Result<(), Error> {
295 let block_hash = block.hash();
296 let block_height = block.height();
297 for tx in block.txs() {
298 let tx_hash = tx.hash();
299 for (index, utxo) in tx.outputs().into_iter().enumerate() {
300 let channels = self.wasm_store.data().router.find_utxo_targets(&utxo)?;
301 if channels.is_empty() {
302 continue;
303 }
304
305 let event = wit::Event::UtxoUndo(wit::balius::app::driver::Utxo {
306 block: wit::balius::app::driver::BlockRef {
307 block_hash: block_hash.clone(),
308 block_height,
309 },
310 body: utxo.to_bytes(),
311 ref_: wit::balius::app::driver::TxoRef {
312 tx_hash: tx_hash.clone(),
313 txo_index: index as u32,
314 },
315 });
316
317 for channel in channels {
318 self.acknowledge_event(channel, &event).await?;
319 }
320 }
321 }
322
323 Ok(())
324 }
325
326 async fn apply_chain(
327 &mut self,
328 undo_blocks: &Vec<Block>,
329 next_block: &Block,
330 ) -> Result<(), Error> {
331 for block in undo_blocks {
332 self.undo_block(block).await?;
333 }
334
335 self.apply_block(next_block).await
336 }
337}
338
339type WorkerMap = HashMap<String, LoadedWorker>;
340
341#[derive(Clone)]
342pub struct Runtime {
343 engine: wasmtime::Engine,
344 linker: wasmtime::component::Linker<WorkerState>,
345 loaded: Arc<Mutex<WorkerMap>>,
346
347 store: store::Store,
348 ledger: Option<ledgers::Ledger>,
349 kv: Option<kv::Kv>,
350 submit: Option<submit::Submit>,
351}
352
353impl Runtime {
354 pub fn builder(store: store::Store) -> RuntimeBuilder {
355 RuntimeBuilder::new(store)
356 }
357
358 pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
359 let lowest_seq = self
360 .loaded
361 .lock()
362 .await
363 .values()
364 .map(|w| w.cursor)
365 .flatten()
366 .min();
367
368 if let Some(seq) = lowest_seq {
369 debug!(lowest_seq, "found lowest seq");
370 return self.store.find_chain_point(seq);
371 }
372
373 Ok(None)
374 }
375
376 pub async fn register_worker(
377 &mut self,
378 id: &str,
379 wasm_path: impl AsRef<Path>,
380 config: serde_json::Value,
381 ) -> Result<(), Error> {
382 let component = wasmtime::component::Component::from_file(&self.engine, wasm_path)?;
383
384 let mut wasm_store = wasmtime::Store::new(
385 &self.engine,
386 WorkerState {
387 worker_id: id.to_owned(),
388 router: Router::new(),
389 ledger: self.ledger.clone(),
390 kv: self.kv.clone(),
391 submit: self.submit.clone(),
392 },
393 );
394
395 let instance =
396 wit::Worker::instantiate_async(&mut wasm_store, &component, &self.linker).await?;
397
398 let config = serde_json::to_vec(&config).unwrap();
399 instance.call_init(&mut wasm_store, &config).await?;
400
401 let cursor = self.store.get_worker_cursor(id)?;
402 debug!(cursor, id, "found cursor for worker");
403
404 self.loaded.lock().await.insert(
405 id.to_owned(),
406 LoadedWorker {
407 wasm_store,
408 instance,
409 cursor,
410 },
411 );
412
413 Ok(())
414 }
415
416 pub async fn handle_chain(
417 &mut self,
418 undo_blocks: &Vec<Block>,
419 next_block: &Block,
420 ) -> Result<(), Error> {
421 info!("applying block");
422
423 let log_seq = self.store.write_ahead(undo_blocks, next_block)?;
424
425 let mut workers = self.loaded.lock().await;
426
427 let mut store_update = self.store.start_atomic_update(log_seq)?;
428
429 for (_, worker) in workers.iter_mut() {
430 worker.apply_chain(undo_blocks, next_block).await?;
431 store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
432 }
433
434 store_update.commit()?;
435
436 Ok(())
437 }
438
439 pub async fn handle_request(
440 &self,
441 worker: &str,
442 method: &str,
443 params: Vec<u8>,
444 ) -> Result<wit::Response, Error> {
445 let mut lock = self.loaded.lock().await;
446
447 let worker = lock
448 .get_mut(worker)
449 .ok_or(Error::WorkerNotFound(worker.to_string()))?;
450
451 let channel = worker
452 .wasm_store
453 .data()
454 .router
455 .find_request_target(method)?;
456
457 let evt = wit::Event::Request(params);
458
459 worker.dispatch_event(channel, &evt).await
460 }
461}
462
463pub struct RuntimeBuilder {
464 store: store::Store,
465 engine: wasmtime::Engine,
466 linker: wasmtime::component::Linker<WorkerState>,
467 ledger: Option<ledgers::Ledger>,
468 kv: Option<kv::Kv>,
469 submit: Option<submit::Submit>,
470}
471
472impl RuntimeBuilder {
473 pub fn new(store: store::Store) -> Self {
474 let mut config = wasmtime::Config::new();
475 config.async_support(true);
476 let engine = wasmtime::Engine::new(&config).unwrap();
477 let mut linker = wasmtime::component::Linker::new(&engine);
478
479 wit::balius::app::driver::add_to_linker(&mut linker, |state: &mut WorkerState| state)
480 .unwrap();
481
482 Self {
483 store,
484 engine,
485 linker,
486 ledger: None,
487 kv: None,
488 submit: None,
489 }
490 }
491
492 pub fn with_ledger(mut self, ledger: ledgers::Ledger) -> Self {
493 self.ledger = Some(ledger);
494
495 wit::balius::app::ledger::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
496 state.ledger.as_mut().unwrap()
497 })
498 .unwrap();
499
500 self
501 }
502
503 pub fn with_kv(mut self, kv: kv::Kv) -> Self {
504 self.kv = Some(kv);
505
506 wit::balius::app::kv::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
507 state.kv.as_mut().unwrap()
508 })
509 .unwrap();
510
511 self
512 }
513
514 pub fn with_submit(mut self, submit: submit::Submit) -> Self {
515 self.submit = Some(submit);
516
517 wit::balius::app::submit::add_to_linker(&mut self.linker, |state: &mut WorkerState| {
518 state.submit.as_mut().unwrap()
519 })
520 .unwrap();
521
522 self
523 }
524
525 pub fn build(self) -> Result<Runtime, Error> {
526 let RuntimeBuilder {
527 store,
528 engine,
529 linker,
530 ledger,
531 kv,
532 submit,
533 } = self;
534
535 Ok(Runtime {
536 loaded: Default::default(),
537 engine,
538 linker,
539 store,
540 ledger,
541 kv,
542 submit,
543 })
544 }
545}