balius_runtime/drivers/
chainsync.rs1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4use tokio::select;
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7use utxorpc::CardanoSyncClient;
8
9use crate::{Block, ChainPoint, Error, Runtime};
10
11impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
12 fn from(point: ChainPoint) -> Self {
13 match point {
14 ChainPoint::Cardano(x) => x.clone(),
15 #[allow(unreachable_patterns)]
16 _ => todo!(),
17 }
18 }
19}
20
21#[derive(Clone, Serialize, Deserialize, Debug)]
22pub struct Config {
23 pub endpoint_url: String,
24 pub headers: Option<HashMap<String, String>>,
25}
26
27pub type UndoBlocks = Vec<Block>;
28pub type NextBlock = Block;
29
30async fn gather_blocks(
32 tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
33) -> Result<(NextBlock, UndoBlocks), Error> {
34 let mut undos = vec![];
35
36 loop {
37 let event = tip.event().await?;
38
39 match event {
40 utxorpc::TipEvent::Apply(chain_block) => {
41 let next = Block::Cardano(chain_block.parsed.unwrap());
42 break Ok((next, undos));
43 }
44 utxorpc::TipEvent::Undo(chain_block) => {
45 undos.push(Block::Cardano(chain_block.parsed.unwrap()));
46 }
47 utxorpc::TipEvent::Reset(_) => unreachable!(),
48 }
49 }
50}
51
52pub async fn run(
53 config: Config,
54 mut runtime: Runtime,
55 cancel: CancellationToken,
56) -> Result<(), Error> {
57 let mut builder = utxorpc::ClientBuilder::new()
58 .uri(&config.endpoint_url)
59 .map_err(|e| Error::Driver(e.to_string()))?;
60
61 if let Some(headers) = &config.headers {
62 for (k, v) in headers.iter() {
63 builder = builder
64 .metadata(k, v)
65 .map_err(|e| Error::Driver(e.to_string()))?;
66 }
67 }
68
69 let mut sync = builder.build::<CardanoSyncClient>().await;
70
71 let cursor = runtime
72 .chain_cursor()
73 .await?
74 .map(Into::into)
75 .into_iter()
76 .collect();
77
78 info!(cursor = ?cursor, "found runtime cursor");
79
80 let mut tip = sync
83 .follow_tip(cursor)
84 .await
85 .map_err(|e| Error::Driver(e.to_string()))?;
86
87 match tip.event().await? {
89 utxorpc::TipEvent::Reset(point) => {
90 warn!(
91 slot = point.index,
92 "TODO: check that reset is to the requested chain point"
93 );
94 }
95 _ => return Err(Error::Driver("unexpected event".to_string())),
96 }
97
98 info!("starting follow-tip loop");
99
100 loop {
101 select! {
102 _ = cancel.cancelled() => {
103 warn!("chain-sync driver cancelled");
104 break Ok(())
105 },
106 batch = gather_blocks(&mut tip) => {
107 let (next, undos) = batch?;
108 runtime.handle_chain(&undos, &next).await?;
109 }
110 }
111 }
112}