balius_runtime/drivers/
chainsync.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use tokio::select;
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7use utxorpc::CardanoSyncClient;
8
9use crate::{Block, ChainPoint, Error, Runtime};
10
11impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
12    fn from(point: ChainPoint) -> Self {
13        match point {
14            ChainPoint::Cardano(x) => x.clone(),
15            #[allow(unreachable_patterns)]
16            _ => todo!(),
17        }
18    }
19}
20
21#[derive(Clone, Serialize, Deserialize, Debug)]
22pub struct Config {
23    pub endpoint_url: String,
24    pub headers: Option<HashMap<String, String>>,
25}
26
27pub type UndoBlocks = Vec<Block>;
28pub type NextBlock = Block;
29
30/// Gather undo blocks from the tip until the next block is encountered.
31async fn gather_blocks(
32    tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
33) -> Result<(NextBlock, UndoBlocks), Error> {
34    let mut undos = vec![];
35
36    loop {
37        let event = tip.event().await?;
38
39        match event {
40            utxorpc::TipEvent::Apply(chain_block) => {
41                let next = Block::Cardano(chain_block.parsed.unwrap());
42                break Ok((next, undos));
43            }
44            utxorpc::TipEvent::Undo(chain_block) => {
45                undos.push(Block::Cardano(chain_block.parsed.unwrap()));
46            }
47            utxorpc::TipEvent::Reset(_) => unreachable!(),
48        }
49    }
50}
51
52pub async fn run(
53    config: Config,
54    mut runtime: Runtime,
55    cancel: CancellationToken,
56) -> Result<(), Error> {
57    let mut builder = utxorpc::ClientBuilder::new()
58        .uri(&config.endpoint_url)
59        .map_err(|e| Error::Driver(e.to_string()))?;
60
61    if let Some(headers) = &config.headers {
62        for (k, v) in headers.iter() {
63            builder = builder
64                .metadata(k, v)
65                .map_err(|e| Error::Driver(e.to_string()))?;
66        }
67    }
68
69    let mut sync = builder.build::<CardanoSyncClient>().await;
70
71    let cursor = runtime
72        .chain_cursor()
73        .await?
74        .map(Into::into)
75        .into_iter()
76        .collect();
77
78    info!(cursor = ?cursor, "found runtime cursor");
79
80    // TODO: handle disconnections and retry logic
81
82    let mut tip = sync
83        .follow_tip(cursor)
84        .await
85        .map_err(|e| Error::Driver(e.to_string()))?;
86
87    // confirm first event is a reset to the requested chain point
88    match tip.event().await? {
89        utxorpc::TipEvent::Reset(point) => {
90            warn!(
91                slot = point.index,
92                "TODO: check that reset is to the requested chain point"
93            );
94        }
95        _ => return Err(Error::Driver("unexpected event".to_string())),
96    }
97
98    info!("starting follow-tip loop");
99
100    loop {
101        select! {
102            _ = cancel.cancelled() => {
103                warn!("chain-sync driver cancelled");
104                break Ok(())
105            },
106            batch = gather_blocks(&mut tip) => {
107                let (next, undos) = batch?;
108                runtime.handle_chain(&undos, &next).await?;
109            }
110        }
111    }
112}