balius_runtime/drivers/
chainsync.rs

1use serde::{Deserialize, Serialize};
2use tokio::select;
3use tokio_util::sync::CancellationToken;
4use tracing::{info, warn};
5use utxorpc::CardanoSyncClient;
6
7use crate::{Block, ChainPoint, Error, Runtime};
8
9impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
10    fn from(point: ChainPoint) -> Self {
11        match point {
12            ChainPoint::Cardano(x) => x.clone(),
13            _ => todo!(),
14        }
15    }
16}
17
18#[derive(Clone, Serialize, Deserialize, Debug)]
19pub struct Config {
20    pub endpoint_url: String,
21    pub api_key: String,
22}
23
24pub type UndoBlocks = Vec<Block>;
25pub type NextBlock = Block;
26
27/// Gather undo blocks from the tip until the next block is encountered.
28async fn gather_blocks(
29    tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
30) -> Result<(NextBlock, UndoBlocks), Error> {
31    let mut undos = vec![];
32
33    loop {
34        let event = tip.event().await?;
35
36        match event {
37            utxorpc::TipEvent::Apply(chain_block) => {
38                let next = Block::Cardano(chain_block.parsed.unwrap());
39                break Ok((next, undos));
40            }
41            utxorpc::TipEvent::Undo(chain_block) => {
42                undos.push(Block::Cardano(chain_block.parsed.unwrap()));
43            }
44            utxorpc::TipEvent::Reset(_) => unreachable!(),
45        }
46    }
47}
48
49pub async fn run(
50    config: Config,
51    mut runtime: Runtime,
52    cancel: CancellationToken,
53) -> Result<(), Error> {
54    let mut sync = utxorpc::ClientBuilder::new()
55        .uri(&config.endpoint_url)
56        .map_err(|e| Error::Driver(e.to_string()))?
57        .metadata("dmtr-api-key", config.api_key)
58        .map_err(|e| Error::Driver(e.to_string()))?
59        .build::<CardanoSyncClient>()
60        .await;
61
62    let cursor = runtime
63        .chain_cursor()
64        .await?
65        .map(Into::into)
66        .into_iter()
67        .collect();
68
69    info!(cursor = ?cursor, "found runtime cursor");
70
71    // TODO: handle disconnections and retry logic
72
73    let mut tip = sync
74        .follow_tip(cursor)
75        .await
76        .map_err(|e| Error::Driver(e.to_string()))?;
77
78    // confirm first event is a reset to the requested chain point
79    match tip.event().await? {
80        utxorpc::TipEvent::Reset(point) => {
81            warn!(
82                slot = point.index,
83                "TODO: check that reset is to the requested chain point"
84            );
85        }
86        _ => return Err(Error::Driver("unexpected event".to_string())),
87    }
88
89    info!("starting follow-tip loop");
90
91    loop {
92        select! {
93            _ = cancel.cancelled() => {
94                warn!("chain-sync driver cancelled");
95                break Ok(())
96            },
97            batch = gather_blocks(&mut tip) => {
98                let (next, undos) = batch?;
99                runtime.handle_chain(&undos, &next).await?;
100            }
101        }
102    }
103}