use crate::{
Config,
log::EthEventLog,
ports::RelayerDb,
service::state::EthLocal,
};
use async_trait::async_trait;
use core::time::Duration;
use ethers_core::types::{
Filter,
Log,
SyncingStatus,
ValueOrArray,
};
use ethers_providers::{
Http,
Middleware,
Provider,
ProviderError,
Quorum,
QuorumProvider,
WeightedProvider,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
ServiceRunner,
StateWatcher,
TaskNextAction,
};
use fuel_core_types::{
blockchain::primitives::DaBlockHeight,
entities::Message,
};
use futures::StreamExt;
use std::convert::TryInto;
use tokio::sync::watch;
use self::{
get_logs::*,
run::RelayerData,
};
mod get_logs;
mod run;
mod state;
mod syncing;
#[cfg(test)]
mod test;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncState {
PartiallySynced(DaBlockHeight),
Synced(DaBlockHeight),
}
impl SyncState {
pub fn da_block_height(&self) -> DaBlockHeight {
match self {
Self::PartiallySynced(height) | Self::Synced(height) => *height,
}
}
pub fn is_synced(&self) -> bool {
matches!(self, Self::Synced(_))
}
}
type Synced = watch::Receiver<SyncState>;
type NotifySynced = watch::Sender<SyncState>;
pub type Service<D> = CustomizableService<Provider<QuorumProvider<Http>>, D>;
type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;
#[derive(Clone)]
pub struct SharedState {
synced: Synced,
}
pub struct NotInitializedTask<P, D> {
synced: NotifySynced,
eth_node: P,
database: D,
config: Config,
retry_on_error: bool,
}
pub enum RpcOutcome {
Success { logs_downloaded: u64 },
Error,
}
pub trait PageSizer {
fn update(&mut self, outcome: RpcOutcome);
fn page_size(&self) -> u64;
}
pub struct AdaptivePageSizer {
current: u64,
max: u64,
successful_rpc_calls: u64,
grow_threshold: u64,
max_logs_per_rpc: u64,
}
impl AdaptivePageSizer {
fn new(current: u64, max: u64, grow_threshold: u64, max_logs_per_rpc: u64) -> Self {
Self {
current,
max,
grow_threshold,
max_logs_per_rpc,
successful_rpc_calls: 0,
}
}
}
impl PageSizer for AdaptivePageSizer {
fn update(&mut self, outcome: RpcOutcome) {
const PAGE_GROW_FACTOR_NUM: u64 = 125;
const PAGE_GROW_FACTOR_DEN: u64 = 100;
const PAGE_SHRINK_FACTOR: u64 = 2;
match outcome {
RpcOutcome::Error => {
self.successful_rpc_calls = 0;
self.current = (self.current / PAGE_SHRINK_FACTOR).max(1);
}
RpcOutcome::Success { logs_downloaded }
if logs_downloaded > self.max_logs_per_rpc =>
{
self.successful_rpc_calls = 0;
self.current = (self.current / PAGE_SHRINK_FACTOR).max(1);
}
_ => {
self.successful_rpc_calls = self.successful_rpc_calls.saturating_add(1);
if self.successful_rpc_calls >= self.grow_threshold
&& self.current < self.max
{
let grown = self.current.saturating_mul(PAGE_GROW_FACTOR_NUM)
/ PAGE_GROW_FACTOR_DEN;
self.current = if grown > self.current {
grown.min(self.max)
} else {
(self.current.saturating_add(1)).min(self.max)
};
self.successful_rpc_calls = 0;
}
}
}
}
fn page_size(&self) -> u64 {
self.current
}
}
pub struct Task<P, D, S> {
synced: NotifySynced,
eth_node: P,
database: D,
config: Config,
shutdown: StateWatcher,
retry_on_error: bool,
page_sizer: S,
}
impl<P, D> NotInitializedTask<P, D>
where
D: RelayerDb + 'static,
{
fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
let da_block_height = database.get_finalized_da_height().unwrap_or_else(|| {
let height_before_deployed = config.da_deploy_height.0.saturating_sub(1);
height_before_deployed.into()
});
let (synced, _) = watch::channel(SyncState::PartiallySynced(da_block_height));
Self {
synced,
eth_node,
database,
config,
retry_on_error,
}
}
}
impl<P, D, S> RelayerData for Task<P, D, S>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
S: PageSizer + 'static + Send + Sync,
{
async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
let mut shutdown = self.shutdown.clone();
tokio::select! {
biased;
_ = shutdown.while_started() => {
Err(anyhow::anyhow!("The relayer got a stop signal"))
},
result = syncing::wait_if_eth_syncing(
&self.eth_node,
self.config.syncing_call_frequency,
self.config.syncing_log_frequency,
) => {
result
}
}
}
async fn download_logs(
&mut self,
eth_sync_gap: &state::EthSyncGap,
) -> anyhow::Result<()> {
let logs = download_logs(
eth_sync_gap,
self.config.eth_v2_listening_contracts.clone(),
&self.eth_node,
&mut self.page_sizer,
);
let logs = logs.take_until(self.shutdown.while_started());
write_logs(&mut self.database, logs).await
}
fn update_synced(&self, state: &state::EthState) {
self.synced.send_if_modified(|last_state| {
let new_sync = state.sync_state();
if new_sync != *last_state {
*last_state = new_sync;
true
} else {
false
}
});
}
fn storage_da_block_height(&self) -> Option<u64> {
self.database
.get_finalized_da_height()
.map(|height| height.into())
}
}
#[async_trait]
impl<P, D> RunnableService for NotInitializedTask<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
{
const NAME: &'static str = "Relayer";
type SharedData = SharedState;
type Task = Task<P, D, AdaptivePageSizer>;
type TaskParams = ();
fn shared_data(&self) -> Self::SharedData {
let synced = self.synced.subscribe();
SharedState { synced }
}
async fn into_task(
mut self,
watcher: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let shutdown = watcher.clone();
let NotInitializedTask {
synced,
eth_node,
database,
config,
retry_on_error,
} = self;
let page_sizer = AdaptivePageSizer::new(
config.log_page_size,
config.log_page_size,
50,
config.max_logs_per_rpc,
);
let task = Task {
synced,
eth_node,
database,
shutdown,
retry_on_error,
page_sizer,
config,
};
Ok(task)
}
}
impl<P, D, S> RunnableTask for Task<P, D, S>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
S: PageSizer + 'static + Send + Sync,
{
async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
let now = tokio::time::Instant::now();
let result = run::run(self).await;
if self.shutdown.borrow_and_update().started()
&& (result.is_err() | self.synced.borrow().is_synced())
{
tokio::time::sleep(
self.config
.sync_minimum_duration
.saturating_sub(now.elapsed()),
)
.await;
}
match result {
Err(err) => {
if !self.retry_on_error {
tracing::error!("Exiting due to Error in relayer task: {}", err);
TaskNextAction::Stop
} else {
TaskNextAction::ErrorContinue(err)
}
}
_ => TaskNextAction::Continue,
}
}
async fn shutdown(self) -> anyhow::Result<()> {
Ok(())
}
}
impl SharedState {
pub async fn await_synced(&self) -> anyhow::Result<()> {
let mut rx = self.synced.clone();
loop {
if rx.borrow_and_update().is_synced() {
break;
}
rx.changed().await?;
}
Ok(())
}
pub async fn await_at_least_synced(
&self,
height: &DaBlockHeight,
) -> anyhow::Result<()> {
let mut rx = self.synced.clone();
loop {
if rx.borrow_and_update().da_block_height() >= *height {
break;
}
rx.changed().await?;
}
Ok(())
}
pub fn get_finalized_da_height(&self) -> DaBlockHeight {
self.synced.borrow().da_block_height()
}
}
impl<P, D, S> state::EthRemote for Task<P, D, S>
where
P: Middleware<Error = ProviderError>,
D: RelayerDb + 'static,
S: PageSizer + 'static + Send + Sync,
{
async fn finalized(&self) -> anyhow::Result<u64> {
let mut shutdown = self.shutdown.clone();
tokio::select! {
biased;
_ = shutdown.while_started() => {
Err(anyhow::anyhow!("The relayer got a stop signal"))
},
block = self.eth_node.get_block(ethers_core::types::BlockNumber::Finalized) => {
let block_number = block.map_err(|err| anyhow::anyhow!("failed to get block from Eth node: {err:?}"))?
.and_then(|block| block.number)
.ok_or(anyhow::anyhow!("Block pending"))?
.as_u64();
Ok(block_number)
}
}
}
}
impl<P, D, S> EthLocal for Task<P, D, S>
where
P: Middleware<Error = ProviderError>,
D: RelayerDb + 'static,
S: PageSizer + 'static + Send + Sync,
{
fn observed(&self) -> u64 {
self.synced.borrow().da_block_height().into()
}
}
pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
where
D: RelayerDb + 'static,
{
let urls = config
.relayer
.clone()
.ok_or_else(|| {
anyhow::anyhow!(
"Tried to start Relayer without setting an eth_client in the config"
)
})?
.into_iter()
.map(|url| WeightedProvider::new(Http::new(url)));
let eth_node = Provider::new(QuorumProvider::new(Quorum::Majority, urls));
let retry_on_error = true;
Ok(new_service_internal(
eth_node,
database,
config,
retry_on_error,
))
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_service_test<P, D>(
eth_node: P,
database: D,
config: Config,
) -> CustomizableService<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
{
let retry_on_fail = false;
new_service_internal(eth_node, database, config, retry_on_fail)
}
fn new_service_internal<P, D>(
eth_node: P,
database: D,
config: Config,
retry_on_error: bool,
) -> CustomizableService<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + 'static,
{
let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);
CustomizableService::new(task)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn adaptive_page_sizer_grows_when_threshold_exceeded() {
let grow_threshold = 50;
let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, 10_000);
for _ in 0..grow_threshold {
sizer.update(RpcOutcome::Success {
logs_downloaded: 100,
});
}
sizer.update(RpcOutcome::Success {
logs_downloaded: 100,
});
assert_eq!(sizer.page_size(), 5);
}
#[test]
fn adaptive_page_sizer_does_not_grow_if_below_threshold() {
let grow_threshold = 50;
let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, 10_000);
for _ in 0..grow_threshold - 10 {
sizer.update(RpcOutcome::Success {
logs_downloaded: 100,
});
}
assert_eq!(sizer.page_size(), 4);
}
#[test]
fn adaptive_page_sizer_does_not_grow_if_at_max() {
let grow_threshold = 50;
let mut sizer = AdaptivePageSizer::new(10, 10, grow_threshold, 10_000);
for _ in 0..grow_threshold + 1 {
sizer.update(RpcOutcome::Success {
logs_downloaded: 100,
});
}
assert_eq!(sizer.page_size(), 10);
}
#[test]
fn adaptive_page_sizer_shrinks_on_rpc_error() {
let grow_threshold = 50;
let mut sizer = AdaptivePageSizer::new(6, 10, grow_threshold, 10_000);
sizer.update(RpcOutcome::Error);
assert_eq!(sizer.page_size(), 3);
}
#[test]
fn adaptive_page_sizer_shrinks_on_excessive_logs() {
let mut sizer = AdaptivePageSizer::new(6, 10, 50, 100);
sizer.update(RpcOutcome::Success {
logs_downloaded: 101,
});
assert_eq!(sizer.page_size(), 3);
}
#[test]
fn adaptive_page_sizer_never_goes_below_one() {
let mut sizer = AdaptivePageSizer::new(1, 10, 50, 10_000);
sizer.update(RpcOutcome::Error);
assert_eq!(sizer.page_size(), 1);
}
#[test]
fn adaptive_page_sizer_resets_successful_calls_after_growth() {
let grow_threshold = 3;
let max_logs_per_rpc = 100;
let mut sizer = AdaptivePageSizer::new(2, 10, grow_threshold, max_logs_per_rpc);
sizer.update(RpcOutcome::Success {
logs_downloaded: 50,
});
sizer.update(RpcOutcome::Success {
logs_downloaded: 60,
});
sizer.update(RpcOutcome::Success {
logs_downloaded: 70,
});
assert_eq!(sizer.successful_rpc_calls, 0, "Should reset after growth");
}
#[test]
fn adaptive_page_sizer_accumulates_successful_calls_until_threshold() {
let grow_threshold = 3;
let max_logs_per_rpc = 100;
let mut sizer = AdaptivePageSizer::new(4, 10, grow_threshold, max_logs_per_rpc);
sizer.update(RpcOutcome::Success {
logs_downloaded: 20,
});
sizer.update(RpcOutcome::Success {
logs_downloaded: 25,
});
assert_eq!(sizer.page_size(), 4);
sizer.update(RpcOutcome::Success {
logs_downloaded: 30,
}); assert_eq!(sizer.page_size(), 5);
}
#[test]
fn adaptive_page_sizer_grows_by_one_if_growth_factor_stalls() {
let grow_threshold = 50;
let mut sizer = AdaptivePageSizer::new(2, 10, grow_threshold, 10_000);
for _ in 0..grow_threshold {
sizer.update(RpcOutcome::Success {
logs_downloaded: 100,
});
}
sizer.update(RpcOutcome::Success {
logs_downloaded: 100,
});
assert_eq!(sizer.page_size(), 3, "Page size should grow by at least 1");
}
#[test]
fn adaptive_page_sizer_shrinks_when_logs_exceed_max_allowed() {
let grow_threshold = 50;
let max_logs_per_rpc = 100;
let mut sizer = AdaptivePageSizer::new(6, 10, grow_threshold, max_logs_per_rpc);
sizer.update(RpcOutcome::Success {
logs_downloaded: max_logs_per_rpc + 1,
});
assert_eq!(
sizer.page_size(),
3,
"Page size should shrink when log count exceeds max_logs_per_rpc"
);
}
}