tycho-collator 0.3.9

A collator node.
Documentation
use anyhow::Result;
use bytes::Bytes;
use tycho_types::models::{ConsensusConfig, GenesisInfo};

use crate::mempool::{
    GetAnchorResult, MempoolAdapter, MempoolAdapterStdImpl, MempoolAnchorId, StateUpdateContext,
};
use crate::tracing_targets;
use crate::types::processed_upto::BlockSeqno;

#[async_trait::async_trait]
impl MempoolAdapter for MempoolAdapterStdImpl {
    async fn handle_mc_state_update(&self, new_cx: Box<StateUpdateContext>) -> Result<()> {
        tracing::debug!(
            target: tracing_targets::MEMPOOL_ADAPTER,
            tka = %new_cx.top_processed_to_anchor_id,
            "Received state update from mc block",
        );
        assert!(new_cx.mc_block_id.is_masterchain(), "expect only MC data");

        // assume first block versions are monotonic by both top anchor and seqno
        // and there may be a second block version out of particular order,
        // but strictly before `handle_top_processed_to_anchor()` is called;
        // handle_top_processed_to_anchor() is called with monotonically increasing anchors
        let mut keeper_guard = self.keeper.lock().await;

        // collator won't receive any anchors since the prepare until the block gets signed
        keeper_guard.check_expect_genesis_change(&self.cache, &new_cx)?;

        if let Some(ctx) = keeper_guard.state_update_queue.push(new_cx)? {
            self.process_state_update(&mut keeper_guard, &ctx).await?;
            (self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
            keeper_guard.last_state_update = Some(*ctx);
        }

        Ok(())
    }

    async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()> {
        let mut keeper_guard = self.keeper.lock().await;

        for ctx in keeper_guard.state_update_queue.signed(mc_block_seqno)? {
            self.process_state_update(&mut keeper_guard, &ctx).await?;
            (self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
            keeper_guard.last_state_update = Some(*ctx);
        }
        Ok(())
    }

    async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
        tracing::debug!(
            target: tracing_targets::MEMPOOL_ADAPTER,
            %anchor_id,
            "get_anchor_by_id"
        );

        let result = match self.cache.get_anchor_by_id(anchor_id).await {
            Some(anchor) => GetAnchorResult::Exist(anchor),
            None => GetAnchorResult::NotExist,
        };

        Ok(result)
    }

    async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
        tracing::debug!(
            target: tracing_targets::MEMPOOL_ADAPTER,
            %prev_anchor_id,
            "get_next_anchor"
        );

        let result = match self.cache.get_next_anchor(prev_anchor_id).await? {
            Some(anchor) => GetAnchorResult::Exist(anchor),
            None => GetAnchorResult::NotExist,
        };

        Ok(result)
    }

    fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
        self.cache.clear(before_anchor_id);
        Ok(())
    }

    fn accept_external(&self, message: Bytes) {
        self.input_buffer.push(message);
    }

    async fn update_delayed_config(
        &self,
        consensus_config: Option<&ConsensusConfig>,
        genesis_info: &GenesisInfo,
    ) -> Result<()> {
        let mut keeper_guard = self.keeper.lock().await;
        if let Some(consensus_config) = consensus_config {
            (keeper_guard.config_builder).set_consensus_config(consensus_config)?;
        } // else: will be set from mc state after sync

        keeper_guard.config_builder.set_genesis(*genesis_info);
        Ok(())
    }
}