use serde::{Deserialize, Serialize};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utxorpc::CardanoSyncClient;
use crate::{Block, ChainPoint, Error, Runtime};
impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
fn from(point: ChainPoint) -> Self {
match point {
ChainPoint::Cardano(x) => x.clone(),
_ => todo!(),
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Config {
pub endpoint_url: String,
pub api_key: String,
}
pub type UndoBlocks = Vec<Block>;
pub type NextBlock = Block;
async fn gather_blocks(
tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
) -> Result<(NextBlock, UndoBlocks), Error> {
let mut undos = vec![];
loop {
let event = tip.event().await?;
match event {
utxorpc::TipEvent::Apply(chain_block) => {
let next = Block::Cardano(chain_block.parsed.unwrap());
break Ok((next, undos));
}
utxorpc::TipEvent::Undo(chain_block) => {
undos.push(Block::Cardano(chain_block.parsed.unwrap()));
}
utxorpc::TipEvent::Reset(_) => unreachable!(),
}
}
}
pub async fn run(
config: Config,
mut runtime: Runtime,
cancel: CancellationToken,
) -> Result<(), Error> {
let mut sync = utxorpc::ClientBuilder::new()
.uri(&config.endpoint_url)
.map_err(|e| Error::Driver(e.to_string()))?
.metadata("dmtr-api-key", config.api_key)
.map_err(|e| Error::Driver(e.to_string()))?
.build::<CardanoSyncClient>()
.await;
let cursor = runtime
.chain_cursor()
.await?
.map(Into::into)
.into_iter()
.collect();
info!(cursor = ?cursor, "found runtime cursor");
let mut tip = sync
.follow_tip(cursor)
.await
.map_err(|e| Error::Driver(e.to_string()))?;
match tip.event().await? {
utxorpc::TipEvent::Reset(point) => {
warn!(
slot = point.index,
"TODO: check that reset is to the requested chain point"
);
}
_ => return Err(Error::Driver("unexpected event".to_string())),
}
info!("starting follow-tip loop");
loop {
select! {
_ = cancel.cancelled() => {
warn!("chain-sync driver cancelled");
break Ok(())
},
batch = gather_blocks(&mut tip) => {
let (next, undos) = batch?;
runtime.handle_chain(&undos, &next).await?;
}
}
}
}