tycho-collator 0.3.9

A collator node.
Documentation
mod adapter_impl;
mod anchor_handler;

use std::sync::Arc;

use anyhow::Result;
use tokio::sync::Mutex;
use tokio::time::MissedTickBehavior;
use tycho_consensus::prelude::*;
use tycho_network::PeerId;
use tycho_util::futures::JoinTask;

use crate::mempool::StateUpdateContext;
use crate::mempool::impls::common::cache::Cache;
use crate::mempool::impls::common::v_set_adapter::VSetAdapter;
use crate::mempool::impls::single_node_impl::anchor_handler::SingleNodeAnchorHandler;
use crate::tracing_targets;

pub struct MempoolAdapterSingleNodeImpl {
    cache: Arc<Cache>,
    config: Mutex<SingleNodeConfigAdapter>,
    local_peer_id: PeerId,

    input_buffer: InputBuffer,
}

struct SingleNodeConfigAdapter {
    builder: MempoolConfigBuilder,
    inner_process: Option<JoinTask<()>>,
}

impl MempoolAdapterSingleNodeImpl {
    pub fn new(mempool_node_config: &MempoolNodeConfig, local_peer_id: PeerId) -> Result<Self> {
        let config_builder = MempoolConfigBuilder::new(mempool_node_config);

        Ok(Self {
            cache: Default::default(),
            config: Mutex::new(SingleNodeConfigAdapter {
                builder: config_builder,
                inner_process: None,
            }),
            local_peer_id,
            input_buffer: InputBuffer::default(),
        })
    }
}

impl MempoolAdapterSingleNodeImpl {
    fn process_start(
        &self,
        config_guard: &mut SingleNodeConfigAdapter,
        ctx: &StateUpdateContext,
    ) -> Result<()> {
        let merged_conf = config_guard.builder.build()?;

        self.input_buffer.apply_config(&merged_conf.conf.consensus);

        let timeout =
            merged_conf.conf.consensus.min_round_duration_millis().get() * WAVE_ROUNDS as u64;

        let mut interval = tokio::time::interval(std::time::Duration::from_millis(timeout));
        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

        let input_buffer = self.input_buffer.clone();

        let mut anchor_handler = SingleNodeAnchorHandler::new(
            self.cache.clone(),
            self.local_peer_id,
            ctx.top_processed_to_anchor_id,
            &merged_conf.conf.consensus,
        );

        let v_set_len = VSetAdapter::init_peers(ctx)?.curr_v_set.len();
        if v_set_len == 2 {
            anyhow::bail!("cannot run mempool with 2 nodes, gen network with either 1 or 3 nodes");
        } else if v_set_len > 2 {
            anyhow::bail!("cannot run {v_set_len} nodes with `single-node` cli flag");
        };

        config_guard.inner_process = Some(JoinTask::new(async move {
            scopeguard::defer!(tracing::warn!(
                target: tracing_targets::MEMPOOL_ADAPTER,
                "Single node Mempool task stopped"
            ));

            loop {
                interval.tick().await;

                let external_messages = input_buffer.fetch(true);

                anchor_handler = anchor_handler.handle(external_messages).await;
            }
        }));

        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Single node Mempool task started");

        Ok(())
    }
}