use crate::utils::EthereumBlockLoader;
use alloy::{
providers::{Provider, ProviderBuilder, RootProvider},
pubsub::{Subscription, SubscriptionStream},
rpc::types::eth::Header,
transports::TransportResult,
};
use anyhow::{Context as _, Result};
use ethexe_common::{
Address, BlockHeader, ProtocolTimelines, SimpleBlockData, db::ConfigStorageRO,
};
use ethexe_db::Database;
use ethexe_ethereum::router::RouterQuery;
use futures::{FutureExt, Stream, StreamExt, future::BoxFuture, stream::FusedStream};
use gprimitives::H256;
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll, ready},
};
pub use sync::SyncError;
use sync::{ChainSync, SyncResult};
mod sync;
pub mod utils;
#[cfg(test)]
mod tests;
type HeadersSubscriptionFuture = BoxFuture<'static, TransportResult<Subscription<Header>>>;
type SyncFuture = future_timing::Timed<BoxFuture<'static, SyncResult<H256>>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ObserverEvent {
Block(SimpleBlockData),
BlockSynced(H256),
}
pub struct ObserverConfig<'a> {
pub rpc: &'a str,
#[allow(rustdoc::private_intra_doc_links)]
pub max_sync_depth: Option<u32>,
}
#[derive(Clone, metrics_derive::Metrics)]
#[metrics(scope = "ethexe_observer")]
pub(crate) struct ObserverMetrics {
pub last_block_number: metrics::Gauge,
pub blocks_latency: metrics::Histogram,
pub block_syncing_latency: metrics::Histogram,
pub recoverable_sync_errors: metrics::Counter,
}
#[derive(Clone, Debug)]
struct RuntimeConfig {
timelines: ProtocolTimelines,
router_address: Address,
middleware_address: Address,
max_sync_depth: u32,
batched_sync_depth: u32,
finalization_period_blocks: u64,
}
pub struct ObserverService {
provider: RootProvider,
config: RuntimeConfig,
chain_sync: ChainSync,
metrics: ObserverMetrics,
headers_stream: SubscriptionStream<Header>,
block_sync_queue: VecDeque<Header>,
sync_future: Option<SyncFuture>,
subscription_future: Option<HeadersSubscriptionFuture>,
subscription_retry_attempt: u32,
}
impl Stream for ObserverService {
type Item = Result<ObserverEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(future) = self.subscription_future.as_mut() {
match ready!(future.as_mut().poll(cx)) {
Ok(subscription) => {
self.headers_stream = subscription.into_stream();
self.subscription_future = None;
self.subscription_retry_attempt = 0;
}
Err(e) => {
log::warn!("observer: header subscription failed: {e:#}");
self.schedule_subscription_retry();
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
}
if let Poll::Ready(res) = self.headers_stream.poll_next_unpin(cx) {
let Some(header) = res else {
log::warn!("observer: header stream ended unexpectedly");
self.schedule_subscription_retry();
cx.waker().wake_by_ref();
return Poll::Pending;
};
self.metrics
.blocks_latency
.record(current_timestamp().saturating_sub(header.timestamp) as f64);
self.metrics.last_block_number.set(header.number as f64);
let data = SimpleBlockData {
hash: H256(header.hash.0),
header: BlockHeader {
height: header.number as u32,
timestamp: header.timestamp,
parent_hash: H256(header.parent_hash.0),
},
};
log::trace!("Received a new block: {data:?}");
self.block_sync_queue.push_front(header);
return Poll::Ready(Some(Ok(ObserverEvent::Block(data))));
}
if self.sync_future.is_none()
&& let Some(header) = self.block_sync_queue.pop_back()
{
self.sync_future = Some(future_timing::timed(
self.chain_sync.clone().sync(header).boxed(),
));
}
if let Some(fut) = self.sync_future.as_mut()
&& let Poll::Ready(timing_result) = fut.poll_unpin(cx)
{
let (timing, result) = timing_result.into_parts();
self.metrics
.block_syncing_latency
.record((timing.busy() + timing.idle()).as_secs_f64());
self.sync_future = None;
match result {
Ok(hash) => {
return Poll::Ready(Some(Ok(ObserverEvent::BlockSynced(hash))));
}
Err(SyncError::RpcError(err)) => {
log::warn!("observer: RPC error, retrying on next head: {err:#}");
self.metrics.recoverable_sync_errors.increment(1);
cx.waker().wake_by_ref();
return Poll::Pending;
}
Err(SyncError::Fatal(err)) => {
return Poll::Ready(Some(Err(err)));
}
}
}
Poll::Pending
}
}
impl FusedStream for ObserverService {
fn is_terminated(&self) -> bool {
false
}
}
impl ObserverService {
pub async fn new(db: Database, config: ObserverConfig<'_>) -> Result<Self> {
let ObserverConfig {
rpc,
max_sync_depth,
} = config;
let router_address = db.config().router_address;
let router_query = RouterQuery::new(rpc, router_address).await?;
let middleware_address = router_query.middleware_address().await?;
let provider = ProviderBuilder::default()
.connect(rpc)
.await
.context("failed to create ethereum provider")?;
let headers_stream = provider
.subscribe_blocks()
.await
.context("failed to subscribe blocks")?
.into_stream();
let config = RuntimeConfig {
timelines: db.config().timelines,
router_address,
middleware_address,
max_sync_depth: max_sync_depth.unwrap_or(u32::MAX),
batched_sync_depth: 2,
finalization_period_blocks: 64,
};
let chain_sync = ChainSync::new(db, config.clone(), provider.clone());
Ok(Self {
provider,
config,
chain_sync,
sync_future: None,
block_sync_queue: VecDeque::new(),
metrics: ObserverMetrics::default(),
subscription_future: None,
subscription_retry_attempt: 0,
headers_stream,
})
}
pub fn provider(&self) -> &RootProvider {
&self.provider
}
pub fn block_loader(&self) -> EthereumBlockLoader {
EthereumBlockLoader::new(self.provider.clone(), self.config.router_address)
}
pub fn router_query(&self) -> RouterQuery {
RouterQuery::from_provider(self.config.router_address, self.provider.clone())
}
fn schedule_subscription_retry(&mut self) {
let attempt = self.subscription_retry_attempt.saturating_add(1);
self.subscription_retry_attempt = attempt;
let backoff = std::time::Duration::from_millis(
(500u64.saturating_mul(1u64 << attempt.min(6))).min(30_000),
);
log::warn!("observer: re-subscribing to headers (attempt {attempt}, after {backoff:?})");
self.metrics.recoverable_sync_errors.increment(1);
let provider = self.provider().clone();
self.subscription_future = Some(
async move {
tokio::time::sleep(backoff).await;
provider.subscribe_blocks().await
}
.boxed(),
);
}
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}