tycho-collator 0.3.9

A collator node.
Documentation
use std::sync::Arc;

use anyhow::Result;
use futures_util::never::Never;
use tokio::sync::mpsc;
use tycho_consensus::prelude::{Commit, MempoolAdapterStore, MempoolDb, MempoolOutput, RoundWatch};
use tycho_types::models::ConsensusConfig;

use crate::mempool::impls::common::cache::Cache;
use crate::mempool::impls::common::parser::Parser;
use crate::mempool::impls::common::shuttle::Shuttle;
use crate::tracing_targets;

pub struct StdAnchorHandler {
    anchor_rx: mpsc::UnboundedReceiver<MempoolOutput>,
    commit_finished: RoundWatch<Commit>,
    cache: Arc<Cache>,
    deduplicate_rounds: u16,
}

impl StdAnchorHandler {
    pub fn new(
        anchor_rx: mpsc::UnboundedReceiver<MempoolOutput>,
        commit_finished: RoundWatch<Commit>,
        cache: &Arc<Cache>,
        config: &ConsensusConfig,
    ) -> Self {
        Self {
            anchor_rx,
            commit_finished,
            cache: cache.clone(),
            deduplicate_rounds: config.deduplicate_rounds,
        }
    }

    pub async fn run(mut self, mempool_db: Arc<MempoolDb>) -> Result<Never> {
        scopeguard::defer!(tracing::warn!(
            target: tracing_targets::MEMPOOL_ADAPTER,
            "handle anchors task stopped"
        ));
        let mut shuttle = Box::new(Shuttle {
            store: MempoolAdapterStore::new(mempool_db),
            parser: Parser::new(self.deduplicate_rounds),
            set_committed_in_db: true,
        });
        while let Some(output) = self.anchor_rx.recv().await {
            shuttle = self.handle_mempool_output(shuttle, output).await?;
        }
        Err(anyhow::Error::msg("anchor channel from mempool is closed"))
    }

    async fn handle_mempool_output(
        &self,
        mut shuttle: Box<Shuttle>,
        output: MempoolOutput,
    ) -> Result<Box<Shuttle>> {
        match output {
            MempoolOutput::Paused(is_paused) => self.cache.set_paused(is_paused),
            MempoolOutput::NextAnchor(adata) => {
                let anchor_round = adata.anchor.round();
                if adata.needs_empty_cache {
                    shuttle.parser = Parser::new(self.deduplicate_rounds);
                    tracing::info!(
                        target: tracing_targets::MEMPOOL_ADAPTER,
                        is_executable = adata.is_executable,
                        "deduplication state dropped",
                    );
                };
                let (output, dirty) = shuttle.handle(adata).await?;
                if let Some(anchor) = output {
                    self.cache.push(Arc::new(anchor))?;
                }
                let shuttle = dirty.clean().await;
                // history payloads are read from DB and marked committed, so ready to be removed
                self.commit_finished.set_max(anchor_round);
                return shuttle;
            }
        };
        Ok(shuttle)
    }
}