tycho-collator 0.3.9

A collator node.
Documentation
use std::sync::Arc;

use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tycho_block_util::message::ExtMsgRepr;
use tycho_types::boc::Boc;
use tycho_types::models::MsgInfo;
use tycho_types::prelude::Load;
use tycho_util::metrics::HistogramGuard;

use crate::mempool::impls::common::deduplicator::Deduplicator;
use crate::mempool::{ExternalMessage, MempoolAnchorId};

pub struct Parser {
    blake: Deduplicator,
    sha: Deduplicator,
}

pub struct ParserOutput {
    pub unique_messages: Vec<Arc<ExternalMessage>>,
    pub unique_payload_bytes: usize,
}

impl Parser {
    pub fn new(round_threshold: u16) -> Self {
        Self {
            blake: Deduplicator::new(round_threshold),
            sha: Deduplicator::new(round_threshold),
        }
    }

    pub fn clean(&mut self, anchor_id: MempoolAnchorId) {
        let Self { blake, sha } = self;
        rayon::join(|| blake.clean(anchor_id), || sha.clean(anchor_id));
    }

    pub fn parse_unique<T>(&mut self, anchor_id: MempoolAnchorId, payloads: Vec<T>) -> ParserOutput
    where
        T: AsRef<[u8]> + Send,
    {
        let _guard = HistogramGuard::begin("tycho_mempool_adapter_parse_anchor_history_time");

        let all_bytes_blake = payloads
            .into_par_iter()
            .filter_map(|bytes| {
                (bytes.as_ref().len() <= ExtMsgRepr::MAX_BOC_SIZE)
                    .then(|| (<[u8; 32]>::from(blake3::hash(bytes.as_ref())), bytes))
            })
            .collect::<Vec<_>>();

        let uniq_bytes_blake = all_bytes_blake
            .into_iter()
            .filter(|(blake, _)| self.blake.check_unique(anchor_id, blake))
            .map(|(_, bytes)| bytes)
            .collect::<Vec<_>>();

        let uniq_messages_blake = uniq_bytes_blake
            .into_par_iter()
            .filter_map(|bytes| {
                Self::parse_message_bytes(bytes.as_ref()).map(|cell| (cell, bytes.as_ref().len()))
            })
            .collect::<Vec<_>>();

        let mut unique_payload_bytes = 0;
        let unique_messages = uniq_messages_blake
            .into_iter()
            .filter(|(message, _)| {
                (self.sha).check_unique(anchor_id, message.cell.repr_hash().as_array())
            })
            .map(|(message, bytes_len)| {
                unique_payload_bytes += bytes_len;
                message
            })
            .collect::<Vec<_>>();

        ParserOutput {
            unique_messages,
            unique_payload_bytes,
        }
    }

    fn parse_message_bytes(message: &[u8]) -> Option<Arc<ExternalMessage>> {
        let cell = Boc::decode(message).ok()?;
        if cell.is_exotic() || cell.level() != 0 || cell.repr_depth() > ExtMsgRepr::MAX_REPR_DEPTH {
            return None;
        }

        let mut cs = cell.as_slice_allow_exotic();
        let MsgInfo::ExtIn(info) = MsgInfo::load_from(&mut cs).ok()? else {
            return None;
        };
        Some(Arc::new(ExternalMessage { cell, info }))
    }
}