ethexe-observer 2.0.0-pre.1

Ethereum chain observer for the ethexe execution layer
// Copyright (C) Gear Technologies Inc.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

//! Implementation of the on-chain data synchronization.

use crate::{
    RuntimeConfig,
    utils::{BlockLoader, EthereumBlockLoader},
};
use alloy::{
    providers::RootProvider,
    rpc::types::eth::Header,
    transports::{RpcError as AlloyRpcError, TransportErrorKind},
};
use anyhow::{Context as _, anyhow};
use ethexe_common::{
    self, BlockData, BlockHeader, CodeBlobInfo, SimpleBlockData,
    db::{GlobalsStorageRO, GlobalsStorageRW, OnChainStorageRO, OnChainStorageRW},
    events::{BlockEvent, RouterEvent, router::CodeValidationRequestedEvent},
};
use ethexe_db::Database;
use ethexe_ethereum::{
    middleware::{ElectionProvider, MiddlewareQuery},
    router::RouterQuery,
};
use gprimitives::H256;
use std::collections::HashMap;

/// Outcome of one chain-sync attempt. `RpcError` is recoverable (caller
/// retries on the next chain head); `Fatal` propagates.
#[derive(Debug, thiserror::Error)]
pub enum SyncError {
    #[error("RPC error during sync: {0:?}")]
    RpcError(anyhow::Error),
    #[error(transparent)]
    Fatal(anyhow::Error),
}

pub type SyncResult<T> = std::result::Result<T, SyncError>;

type Result<T, E = anyhow::Error> = std::result::Result<T, E>;

impl From<anyhow::Error> for SyncError {
    fn from(err: anyhow::Error) -> Self {
        // `contract::Error::TransportError` is `#[error(transparent)]` +
        // `#[from]`; its `source()` skips the wrapper, so match it directly.
        let is_rpc = err.chain().any(|e| {
            e.downcast_ref::<AlloyRpcError<TransportErrorKind>>()
                .is_some()
                || matches!(
                    e.downcast_ref::<alloy::contract::Error>(),
                    Some(alloy::contract::Error::TransportError(_))
                )
        });
        if is_rpc {
            Self::RpcError(err)
        } else {
            Self::Fatal(err)
        }
    }
}

// TODO #4552: make tests for ChainSync
#[derive(Clone)]
pub(crate) struct ChainSync {
    pub db: Database,
    pub config: RuntimeConfig,
    pub router_query: RouterQuery,
    pub middleware_query: MiddlewareQuery,
    pub block_loader: EthereumBlockLoader,
}

impl ChainSync {
    pub fn new(db: Database, config: RuntimeConfig, provider: RootProvider) -> Self {
        let router_query = RouterQuery::from_provider(config.router_address, provider.clone());
        let middleware_query =
            MiddlewareQuery::from_provider(config.middleware_address, provider.clone());
        let block_loader = EthereumBlockLoader::new(provider, config.router_address);
        Self {
            db,
            config,
            router_query,
            middleware_query,
            block_loader,
        }
    }

    pub async fn sync(self, chain_head: Header) -> SyncResult<H256> {
        let block = SimpleBlockData {
            hash: H256(chain_head.hash.0),
            header: BlockHeader {
                height: chain_head.number as u32,
                timestamp: chain_head.timestamp,
                parent_hash: H256(chain_head.parent_hash.0),
            },
        };

        let blocks_data = self.pre_load_data(&block.header).await?;
        let chain = self.load_chain(&block, blocks_data).await?;
        self.ensure_validators(block).await?;
        self.mark_chain_as_synced(chain.into_iter().rev());

        Ok(block.hash)
    }

    async fn load_chain(
        &self,
        block: &SimpleBlockData,
        mut blocks_data: HashMap<H256, BlockData>,
    ) -> Result<Vec<SimpleBlockData>> {
        let mut chain = Vec::new();

        let mut current_block_hash = block.hash;
        while !self.db.block_synced(current_block_hash) {
            let block_data = match blocks_data.remove(&current_block_hash) {
                Some(data) => data,
                None => {
                    self.block_loader
                        .load(
                            current_block_hash,
                            (current_block_hash == block.hash).then_some(block.header),
                        )
                        .await?
                }
            };

            if current_block_hash != block_data.hash {
                unreachable!(
                    "Expected data for block hash {current_block_hash}, got for {}",
                    block_data.hash
                );
            }

            for event in block_data.events.iter() {
                if let &BlockEvent::Router(RouterEvent::CodeValidationRequested(
                    CodeValidationRequestedEvent {
                        code_id,
                        timestamp,
                        tx_hash,
                    },
                )) = event
                {
                    self.db
                        .set_code_blob_info(code_id, CodeBlobInfo { timestamp, tx_hash });
                }
            }

            self.db
                .set_block_header(current_block_hash, block_data.header);
            self.db
                .set_block_events(current_block_hash, &block_data.events);

            chain.push(SimpleBlockData {
                hash: current_block_hash,
                header: block_data.header,
            });

            current_block_hash = block_data.header.parent_hash;
        }

        Ok(chain)
    }

    /// Loads blocks if there is a gap between the `header`'s height and the latest synced block height.
    async fn pre_load_data(&self, header: &BlockHeader) -> Result<HashMap<H256, BlockData>> {
        let latest_synced_eb_height = self.db.globals().latest_synced_eb.header.height;

        if header.height <= latest_synced_eb_height {
            tracing::warn!(
                "Got a block with number {} <= latest synced block number: {}, maybe a reorg",
                header.height,
                latest_synced_eb_height
            );
            // Suppose here that all data is already in db.
            return Ok(Default::default());
        }

        if (header.height - latest_synced_eb_height) >= self.config.max_sync_depth {
            return Err(anyhow!(
                "Too much to sync: current block number: {}, Latest synced block number: {}, Max depth: {}",
                header.height,
                latest_synced_eb_height,
                self.config.max_sync_depth
            ));
        }

        if header.height - latest_synced_eb_height < self.config.batched_sync_depth {
            // No need to pre load data, because amount of blocks is small enough.
            return Ok(Default::default());
        }

        self.block_loader
            .load_many(latest_synced_eb_height as u64..=header.height as u64)
            .await
    }

    /// This function guarantees the next things:
    /// 1. if there is no validators for current era in database - it fetches them.
    /// 2. if the election result is `finalized` it requests for next era validators and sets them in database.
    ///
    /// See [`Self::election_timestamp_finalized`] for the our timestamp `finalization` rules.
    async fn ensure_validators(&self, block_data: SimpleBlockData) -> Result<()> {
        let chain_head_era = self
            .config
            .timelines
            .era_from_ts(block_data.header.timestamp)
            .context("failed to calculate era from timestamp")?;

        // If we don't have validators for current era - set them.
        if self.db.validators(chain_head_era).is_none() {
            let validators = self.router_query.validators_at(block_data.hash).await?;
            self.db.set_validators(chain_head_era, validators);
        }

        // Fetch next era validators if timestamp `finalized` and we don't set them in database already.
        if let Some(election_ts) = self.election_timestamp_finalized(block_data.header.timestamp)
            && self.db.validators(chain_head_era + 1).is_none()
        {
            let next_era_validators = self
                .middleware_query
                .make_election_at(election_ts, 10)
                .await?;
            self.db
                .set_validators(chain_head_era + 1, next_era_validators);
        }

        Ok(())
    }

    fn mark_chain_as_synced(&self, chain: impl Iterator<Item = SimpleBlockData>) {
        for data in chain {
            let SimpleBlockData { hash, header } = data;

            self.db.set_block_synced(hash);

            log::trace!(
                "✅ block {hash} synced, events: {:?}",
                self.db.block_events(hash)
            );

            self.db
                .globals_mutate(|g| g.latest_synced_eb = SimpleBlockData { hash, header });
        }
    }

    /// Function checks the `election_ts` in current era is `finalized` and if it's true then returns it.
    ///
    /// The `finalization` blocks period set in observer's [`RuntimeConfig`].
    fn election_timestamp_finalized(&self, timestamp: u64) -> Option<u64> {
        let era = self.config.timelines.era_from_ts(timestamp)?;
        let election_ts = self.config.timelines.era_election_start_ts(era)?;
        (timestamp.saturating_sub(election_ts)
            > self.config.timelines.slot.get() * self.config.finalization_period_blocks)
            .then_some(election_ts)
    }
}