use alloy::{
network::Ethereum,
providers::{Provider, RootProvider},
transports::TransportError,
};
use tokio::{
sync::{broadcast::error::RecvError, watch},
task::JoinHandle,
};
use tracing::{debug, error, trace};
#[derive(Debug)]
pub struct BlockWatcher {
block_number: watch::Sender<u64>,
host_provider: RootProvider<Ethereum>,
}
impl BlockWatcher {
pub fn new(host_provider: RootProvider<Ethereum>, initial: u64) -> Self {
Self {
block_number: watch::channel(initial).0,
host_provider,
}
}
pub async fn with_current_block(
host_provider: RootProvider<Ethereum>,
) -> Result<Self, TransportError> {
let block_number = host_provider.get_block_number().await?;
Ok(Self::new(host_provider, block_number))
}
pub fn subscribe(&self) -> SharedBlockNumber {
self.block_number.subscribe().into()
}
pub fn spawn(self) -> (SharedBlockNumber, JoinHandle<()>) {
(self.subscribe(), tokio::spawn(self.task_future()))
}
async fn task_future(self) {
let mut sub = match self.host_provider.subscribe_blocks().await {
Ok(sub) => sub,
Err(error) => {
error!(%error);
return;
}
};
debug!("subscribed to host chain blocks");
loop {
match sub.recv().await {
Ok(header) => {
let block_number = header.number;
self.block_number.send_replace(block_number);
trace!(block_number, "updated host block number");
}
Err(RecvError::Lagged(missed)) => {
debug!(%missed, "block subscription lagged");
}
Err(RecvError::Closed) => {
debug!("block subscription closed");
break;
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct SharedBlockNumber(watch::Receiver<u64>);
impl From<watch::Receiver<u64>> for SharedBlockNumber {
fn from(inner: watch::Receiver<u64>) -> Self {
Self(inner)
}
}
impl SharedBlockNumber {
pub fn get(&self) -> u64 {
*self.0.borrow()
}
pub async fn changed(&mut self) -> Result<u64, watch::error::RecvError> {
self.0.changed().await?;
Ok(*self.0.borrow_and_update())
}
pub async fn wait_until(&mut self, target: u64) -> Result<u64, watch::error::RecvError> {
self.0.wait_for(|&n| n >= target).await.map(|r| *r)
}
}