balius_runtime/drivers/
chainsync.rs1use serde::{Deserialize, Serialize};
2use tokio::select;
3use tokio_util::sync::CancellationToken;
4use tracing::{info, warn};
5use utxorpc::CardanoSyncClient;
6
7use crate::{Block, ChainPoint, Error, Runtime};
8
9impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
10 fn from(point: ChainPoint) -> Self {
11 match point {
12 ChainPoint::Cardano(x) => x.clone(),
13 _ => todo!(),
14 }
15 }
16}
17
18#[derive(Clone, Serialize, Deserialize, Debug)]
19pub struct Config {
20 pub endpoint_url: String,
21 pub api_key: String,
22}
23
24pub type UndoBlocks = Vec<Block>;
25pub type NextBlock = Block;
26
27async fn gather_blocks(
29 tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
30) -> Result<(NextBlock, UndoBlocks), Error> {
31 let mut undos = vec![];
32
33 loop {
34 let event = tip.event().await?;
35
36 match event {
37 utxorpc::TipEvent::Apply(chain_block) => {
38 let next = Block::Cardano(chain_block.parsed.unwrap());
39 break Ok((next, undos));
40 }
41 utxorpc::TipEvent::Undo(chain_block) => {
42 undos.push(Block::Cardano(chain_block.parsed.unwrap()));
43 }
44 utxorpc::TipEvent::Reset(_) => unreachable!(),
45 }
46 }
47}
48
49pub async fn run(
50 config: Config,
51 mut runtime: Runtime,
52 cancel: CancellationToken,
53) -> Result<(), Error> {
54 let mut sync = utxorpc::ClientBuilder::new()
55 .uri(&config.endpoint_url)
56 .map_err(|e| Error::Driver(e.to_string()))?
57 .metadata("dmtr-api-key", config.api_key)
58 .map_err(|e| Error::Driver(e.to_string()))?
59 .build::<CardanoSyncClient>()
60 .await;
61
62 let cursor = runtime
63 .chain_cursor()
64 .await?
65 .map(Into::into)
66 .into_iter()
67 .collect();
68
69 info!(cursor = ?cursor, "found runtime cursor");
70
71 let mut tip = sync
74 .follow_tip(cursor)
75 .await
76 .map_err(|e| Error::Driver(e.to_string()))?;
77
78 match tip.event().await? {
80 utxorpc::TipEvent::Reset(point) => {
81 warn!(
82 slot = point.index,
83 "TODO: check that reset is to the requested chain point"
84 );
85 }
86 _ => return Err(Error::Driver("unexpected event".to_string())),
87 }
88
89 info!("starting follow-tip loop");
90
91 loop {
92 select! {
93 _ = cancel.cancelled() => {
94 warn!("chain-sync driver cancelled");
95 break Ok(())
96 },
97 batch = gather_blocks(&mut tip) => {
98 let (next, undos) = batch?;
99 runtime.handle_chain(&undos, &next).await?;
100 }
101 }
102 }
103}